ttg.h
Go to the documentation of this file.
1 // clang-format off
2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
4 
5 /* set up env if this header was included directly */
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
8 #endif // !defined(TTG_IMPL_NAME)
9 
10 /* Whether to defer a potential writer if there are readers.
11  * This may avoid extra copies in exchange for concurrency.
12  * This may cause deadlocks, so use with caution. */
13 #define TTG_PARSEC_DEFER_WRITER false
14 
15 #include "ttg/config.h"
16 
17 #include "ttg/impl_selector.h"
18 
19 /* include ttg header to make symbols available in case this header is included directly */
20 #include "../../ttg.h"
21 
22 #include "ttg/base/keymap.h"
23 #include "ttg/base/tt.h"
24 #include "ttg/base/world.h"
25 #include "ttg/constraint.h"
26 #include "ttg/edge.h"
27 #include "ttg/execution.h"
28 #include "ttg/func.h"
29 #include "ttg/runtimes.h"
30 #include "ttg/terminal.h"
31 #include "ttg/tt.h"
32 #include "ttg/util/env.h"
33 #include "ttg/util/hash.h"
34 #include "ttg/util/meta.h"
35 #include "ttg/util/meta/callable.h"
36 #include "ttg/util/print.h"
37 #include "ttg/util/scope_exit.h"
38 #include "ttg/util/trace.h"
39 #include "ttg/util/typelist.h"
40 
42 
43 #include "ttg/parsec/fwd.h"
44 
45 #include "ttg/parsec/buffer.h"
48 #include "ttg/parsec/devicefunc.h"
49 #include "ttg/parsec/ttvalue.h"
50 #include "ttg/device/task.h"
51 
52 #include <algorithm>
53 #include <array>
54 #include <cassert>
55 #include <cstring>
56 #include <experimental/type_traits>
57 #include <functional>
58 #include <future>
59 #include <iostream>
60 #include <list>
61 #include <map>
62 #include <memory>
63 #include <mutex>
64 #include <numeric>
65 #include <sstream>
66 #include <string>
67 #include <tuple>
68 #include <vector>
69 
70 // needed for MPIX_CUDA_AWARE_SUPPORT
71 #if defined(TTG_HAVE_MPI)
72 #include <mpi.h>
73 #if defined(TTG_HAVE_MPIEXT)
74 #include <mpi-ext.h>
75 #endif // TTG_HAVE_MPIEXT
76 #endif // TTG_HAVE_MPI
77 
78 
79 #include <parsec.h>
80 #include <parsec/class/parsec_hash_table.h>
81 #include <parsec/data_internal.h>
82 #include <parsec/execution_stream.h>
83 #include <parsec/interfaces/interface.h>
84 #include <parsec/mca/device/device.h>
85 #include <parsec/parsec_comm_engine.h>
86 #include <parsec/parsec_internal.h>
87 #include <parsec/scheduling.h>
88 #include <parsec/remote_dep.h>
89 
90 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
91 #include <parsec/mca/device/cuda/device_cuda.h>
92 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
94 #include <parsec/mca/device/hip/device_hip.h>
95 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
96 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
97 #include <parsec/mca/device/level_zero/device_level_zero.h>
98 #endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 
100 #include <parsec/mca/device/device_gpu.h>
101 #if defined(PARSEC_PROF_TRACE)
102 #include <parsec/profiling.h>
103 #undef PARSEC_TTG_PROFILE_BACKEND
104 #if defined(PARSEC_PROF_GRAPHER)
105 #include <parsec/parsec_prof_grapher.h>
106 #endif
107 #endif
108 #include <cstdlib>
109 #include <cstring>
110 
111 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
112 #include <unordered_set>
113 #endif
114 
116 #include "ttg/parsec/thread_local.h"
117 #include "ttg/parsec/ptr.h"
118 #include "ttg/parsec/task.h"
119 #include "ttg/parsec/parsec-ext.h"
120 
121 #include "ttg/device/device.h"
122 
123 #include <boost/type_index.hpp>
124 
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
126 
127 /* PaRSEC function declarations */
128 extern "C" {
129 void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
130 int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
131 }
132 
133 namespace ttg_parsec {
134  typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
135  typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
136  inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
137  inline std::mutex static_map_mutex;
138  typedef std::tuple<int, void *, size_t> static_set_arg_fct_arg_t;
139  inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
140 
141  struct msg_header_t {
142  typedef enum fn_id : std::int8_t {
148  uint32_t taskpool_id = std::numeric_limits<uint32_t>::max();
149  uint64_t op_id = std::numeric_limits<uint64_t>::max();
150  std::size_t key_offset = 0;
152  std::int8_t num_iovecs = 0;
153  bool inline_data = false;
154  int32_t param_id = -1;
155  int num_keys = 0;
156  int sender = -1;
157 
158  msg_header_t() = default;
159 
160  msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
161  : fn_id(fid)
162  , taskpool_id(tid)
163  , op_id(oid)
164  , param_id(pid)
165  , num_keys(nk)
166  , sender(sender)
167  { }
168  };
169 
170  static void unregister_parsec_tags(void *_);
171 
172  namespace detail {
173 
174  constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
175 
176  struct msg_t {
178  static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
179  unsigned char bytes[max_payload_size];
180 
181  msg_t() = default;
182  msg_t(uint64_t tt_id,
183  uint32_t taskpool_id,
184  msg_header_t::fn_id_t fn_id,
185  int32_t param_id,
186  int sender,
187  int num_keys = 1)
188  : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
189  {}
190  };
191 
193 
194  static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
195  int src_rank, void *obj) {
196  static_set_arg_fct_type static_set_arg_fct;
197  parsec_taskpool_t *tp = NULL;
198  msg_header_t *msg = static_cast<msg_header_t *>(data);
199  uint64_t op_id = msg->op_id;
200  tp = parsec_taskpool_lookup(msg->taskpool_id);
201  assert(NULL != tp);
202  static_map_mutex.lock();
203  try {
204  auto op_pair = static_id_to_op_map.at(op_id);
205  static_map_mutex.unlock();
206  tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207  static_set_arg_fct = op_pair.first;
208  static_set_arg_fct(data, size, op_pair.second);
209  tp->tdm.module->incoming_message_end(tp, NULL);
210  return 0;
211  } catch (const std::out_of_range &e) {
212  void *data_cpy = malloc(size);
213  assert(data_cpy != 0);
214  memcpy(data_cpy, data, size);
215  ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
216  ", ", op_id, ", ", data_cpy, ", ", size, ")");
217  delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, data_cpy, size)));
218  static_map_mutex.unlock();
219  return 1;
220  }
221  }
222 
223  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
224  int src, void *cb_data);
225 
226  inline bool &initialized_mpi() {
227  static bool im = false;
228  return im;
229  }
230 
232 
233  } // namespace detail
234 
236  ttg::Edge<> m_ctl_edge;
237  bool _dag_profiling;
238  bool _task_profiling;
239  std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
240  mpi_space_support = {true, false, false};
241 
242  int query_comm_size() {
243  int comm_size;
244  MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
245  return comm_size;
246  }
247 
248  int query_comm_rank() {
249  int comm_rank;
250  MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
251  return comm_rank;
252  }
253 
254  static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
255  {
256  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
257  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
258  }
259 
260  static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
261  {
262  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
263  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
264  }
265 
266  public:
267 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
268  int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
269  int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
270  int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
271 #endif
272 
273  WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
274  : WorldImplBase(query_comm_size(), query_comm_rank())
275  , ctx(c)
276  , own_ctx(c == nullptr)
277 #if defined(PARSEC_PROF_TRACE)
278  , profiling_array(nullptr)
279  , profiling_array_size(0)
280 #endif
281  , _dag_profiling(false)
282  , _task_profiling(false)
283  {
285  if (own_ctx) ctx = parsec_init(ncores, argc, argv);
286 
287  /* query MPI device support */
289 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
290  || MPIX_Query_cuda_support()
291 #endif // MPIX_CUDA_AWARE_SUPPORT
292  ) {
293  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
294  }
295 
297 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
298  || MPIX_Query_hip_support()
299 #endif // MPIX_HIP_AWARE_SUPPORT
300  ) {
301  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
302  }
303 
304 #if defined(PARSEC_PROF_TRACE)
305  if(parsec_profile_enabled) {
306  profile_on();
307 #if defined(PARSEC_TTG_PROFILE_BACKEND)
308  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
309  (int*)&parsec_ttg_profile_backend_set_arg_start,
310  (int*)&parsec_ttg_profile_backend_set_arg_end);
311  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
312  (int*)&parsec_ttg_profile_backend_bcast_arg_start,
313  (int*)&parsec_ttg_profile_backend_bcast_arg_end);
314  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
315  sizeof(size_t), "size{int64_t}",
316  (int*)&parsec_ttg_profile_backend_allocate_datacopy,
317  (int*)&parsec_ttg_profile_backend_free_datacopy);
318 #endif
319  }
320 #endif
321 
322  if( NULL != parsec_ce.tag_register) {
323  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
324  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
325  }
326 
327  create_tpool();
328  }
329 
330 
331  auto *context() { return ctx; }
332  auto *execution_stream() { return parsec_my_execution_stream(); }
333  auto *taskpool() { return tpool; }
334 
335  void create_tpool() {
336  assert(nullptr == tpool);
337  tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
338  tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
339  tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
340  tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
341  tpool->taskpool_name = strdup("TTG Taskpool");
342  parsec_taskpool_reserve_id(tpool);
343 
344  tpool->devices_index_mask = 0;
345  for(int i = 0; i < (int)parsec_nb_devices; i++) {
346  parsec_device_module_t *device = parsec_mca_device_get(i);
347  if( NULL == device ) continue;
348  tpool->devices_index_mask |= (1 << device->device_index);
349  }
350 
351 #ifdef TTG_USE_USER_TERMDET
352  parsec_termdet_open_module(tpool, "user_trigger");
353 #else // TTG_USE_USER_TERMDET
354  parsec_termdet_open_dyn_module(tpool);
355 #endif // TTG_USE_USER_TERMDET
356  tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
357  // In TTG, we use the pending actions to denote that the
358  // taskpool is not ready, i.e. some local tasks could still
359  // be added by the main thread. It should then be initialized
360  // to 0, execute will set it to 1 and mark the tpool as ready,
361  // and the fence() will decrease it back to 0.
362  tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
363  parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
364 
365 #if defined(PARSEC_PROF_TRACE)
366  tpool->profiling_array = profiling_array;
367 #endif
368 
369  // Termination detection in PaRSEC requires to synchronize the
370  // taskpool enabling, to avoid a race condition that would keep
371  // termination detection-related messages in a waiting queue
372  // forever
373  MPI_Barrier(comm());
374 
375  parsec_taskpool_started = false;
376  }
377 
378  /* Deleted copy ctor */
379  WorldImpl(const WorldImpl &other) = delete;
380 
381  /* Deleted move ctor */
382  WorldImpl(WorldImpl &&other) = delete;
383 
384  /* Deleted copy assignment */
385  WorldImpl &operator=(const WorldImpl &other) = delete;
386 
387  /* Deleted move assignment */
388  WorldImpl &operator=(WorldImpl &&other) = delete;
389 
391 
392  static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
393  static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
394 
395  MPI_Comm comm() const { return MPI_COMM_WORLD; }
396 
397  virtual void execute() override {
398  if (!parsec_taskpool_started) {
399  parsec_enqueue(ctx, tpool);
400  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
401  tpool->tdm.module->taskpool_ready(tpool);
402  [[maybe_unused]] auto ret = parsec_context_start(ctx);
403  // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
404  parsec_taskpool_started = true;
405  }
406  }
407 
408  void destroy_tpool() {
409 #if defined(PARSEC_PROF_TRACE)
410  // We don't want to release the profiling array, as it should be persistent
411  // between fences() to allow defining a TT/TTG before a fence() and schedule
412  // it / complete it after a fence()
413  tpool->profiling_array = nullptr;
414 #endif
415  assert(NULL != tpool->tdm.monitor);
416  tpool->tdm.module->unmonitor_taskpool(tpool);
417  parsec_taskpool_free(tpool);
418  tpool = nullptr;
419  }
420 
421  virtual void destroy() override {
422  if (is_valid()) {
423  if (parsec_taskpool_started) {
424  // We are locally ready (i.e. we won't add new tasks)
425  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
426  ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
427  if (own_ctx)
428  parsec_context_wait(ctx);
429  else
430  parsec_taskpool_wait(tpool);
431  }
432  release_ops();
434  destroy_tpool();
435  if (own_ctx) {
436  unregister_parsec_tags(nullptr);
437  } else {
438  parsec_context_at_fini(unregister_parsec_tags, nullptr);
439  }
440 #if defined(PARSEC_PROF_TRACE)
441  if(nullptr != profiling_array) {
442  free(profiling_array);
443  profiling_array = nullptr;
444  profiling_array_size = 0;
445  }
446 #endif
447  if (own_ctx) parsec_fini(&ctx);
448  mark_invalid();
449  }
450  }
451 
452  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
453 
454  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
455 
456  void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
457 
458  void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
459  void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
460 
461  bool dag_profiling() override { return _dag_profiling; }
462 
463  virtual void dag_on(const std::string &filename) override {
464 #if defined(PARSEC_PROF_GRAPHER)
465  if(!_dag_profiling) {
466  profile_on();
467  size_t len = strlen(filename.c_str())+32;
468  char ext_filename[len];
469  snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
470  parsec_prof_grapher_init(ctx, ext_filename);
471  _dag_profiling = true;
472  }
473 #else
474  ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
475  "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
476 #endif
477  }
478 
479  virtual void dag_off() override {
480 #if defined(PARSEC_PROF_GRAPHER)
481  if(_dag_profiling) {
482  parsec_prof_grapher_fini();
483  _dag_profiling = false;
484  }
485 #endif
486  }
487 
488  virtual void profile_off() override {
489 #if defined(PARSEC_PROF_TRACE)
490  _task_profiling = false;
491 #endif
492  }
493 
494  virtual void profile_on() override {
495 #if defined(PARSEC_PROF_TRACE)
496  _task_profiling = true;
497 #endif
498  }
499 
500  virtual bool profiling() override { return _task_profiling; }
501 
503  return mpi_space_support[static_cast<std::size_t>(space)];
504  }
505 
506  virtual void final_task() override {
507 #ifdef TTG_USE_USER_TERMDET
508  if(parsec_taskpool_started) {
509  taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
510  parsec_taskpool_started = false;
511  }
512 #endif // TTG_USE_USER_TERMDET
513  }
514 
515  template <typename keyT, typename output_terminalsT, typename derivedT,
516  typename input_valueTs = ttg::typelist<>, ttg::ExecutionSpace Space>
518 #if defined(PARSEC_PROF_TRACE)
519  std::stringstream ss;
520  build_composite_name_rec(t->ttg_ptr(), ss);
521  ss << t->get_name();
522  register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
523 #endif
524  }
525 
526  protected:
527 #if defined(PARSEC_PROF_TRACE)
528  void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
529  if(nullptr == t)
530  return;
531  build_composite_name_rec(t->ttg_ptr(), ss);
532  ss << t->get_name() << "::";
533  }
534 
535  void register_new_profiling_event(const char *name, int position) {
536  if(2*position >= profiling_array_size) {
537  size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
538  profiling_array = (int*)realloc((void*)profiling_array,
539  new_profiling_array_size * sizeof(int));
540  memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
541  profiling_array_size = new_profiling_array_size;
542  tpool->profiling_array = profiling_array;
543  }
544 
545  assert(0 == tpool->profiling_array[2*position]);
546  assert(0 == tpool->profiling_array[2*position+1]);
547  // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
548  // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
549  // there are three fields, named m, n and k, stored in this order, and each of size int32_t
550  parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
551  (int*)&tpool->profiling_array[2*position],
552  (int*)&tpool->profiling_array[2*position+1]);
553  }
554 #endif
555 
556  virtual void fence_impl(void) override {
557  int rank = this->rank();
558  if (!parsec_taskpool_started) {
559  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
560  MPI_Barrier(comm());
561  return;
562  }
563  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
564  // We are locally ready (i.e. we won't add new tasks)
565  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
566  ttg::trace("ttg_parsec(", rank, "): waiting for completion");
567  parsec_taskpool_wait(tpool);
568 
569  // We need the synchronization between the end of the context and the restart of the taskpool
570  // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
571  // see Issue #118 (TTG)
572  MPI_Barrier(comm());
573 
574  destroy_tpool();
575  create_tpool();
576  execute();
577  }
578 
579  private:
580  parsec_context_t *ctx = nullptr;
581  bool own_ctx = false; //< whether I own the context
582  parsec_taskpool_t *tpool = nullptr;
583  bool parsec_taskpool_started = false;
584 #if defined(PARSEC_PROF_TRACE)
585  int *profiling_array;
586  std::size_t profiling_array_size;
587 #endif
588  };
589 
590  static void unregister_parsec_tags(void *_pidx)
591  {
592  if(NULL != parsec_ce.tag_unregister) {
593  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
594  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
595  }
596  }
597 
598  namespace detail {
599 
600  const parsec_symbol_t parsec_taskclass_param0 = {
601  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
602  .name = "HASH0",
603  .context_index = 0,
604  .min = nullptr,
605  .max = nullptr,
606  .expr_inc = nullptr,
607  .cst_inc = 0 };
608  const parsec_symbol_t parsec_taskclass_param1 = {
609  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
610  .name = "HASH1",
611  .context_index = 1,
612  .min = nullptr,
613  .max = nullptr,
614  .expr_inc = nullptr,
615  .cst_inc = 0 };
616  const parsec_symbol_t parsec_taskclass_param2 = {
617  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
618  .name = "KEY0",
619  .context_index = 2,
620  .min = nullptr,
621  .max = nullptr,
622  .expr_inc = nullptr,
623  .cst_inc = 0 };
624  const parsec_symbol_t parsec_taskclass_param3 = {
625  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
626  .name = "KEY1",
627  .context_index = 3,
628  .min = nullptr,
629  .max = nullptr,
630  .expr_inc = nullptr,
631  .cst_inc = 0 };
632 
634  ttg_data_copy_t *res = nullptr;
635  if (task == nullptr || ptr == nullptr) {
636  return res;
637  }
638  for (int i = 0; i < task->data_count; ++i) {
639  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
640  if (NULL != copy && copy->get_ptr() == ptr) {
641  res = copy;
642  break;
643  }
644  }
645  return res;
646  }
647 
648  inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
649  int i = -1;
650  if (task == nullptr || ptr == nullptr) {
651  return i;
652  }
653  for (i = 0; i < task->data_count; ++i) {
654  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
655  if (NULL != copy && copy->get_ptr() == ptr) {
656  return i;
657  }
658  }
659  return -1;
660  }
661 
663  if (task == nullptr || copy == nullptr) {
664  return false;
665  }
666 
667  if (MAX_PARAM_COUNT < task->data_count) {
668  throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
669  }
670 
671  task->copies[task->data_count] = copy;
672  task->data_count++;
673  return true;
674  }
675 
677  int i;
678  /* find and remove entry; copies are usually appended and removed, so start from back */
679  for (i = task->data_count-1; i >= 0; --i) {
680  if (copy == task->copies[i]) {
681  break;
682  }
683  }
684  if (i < 0) return;
685  /* move all following elements one up */
686  for (; i < task->data_count - 1; ++i) {
687  task->copies[i] = task->copies[i + 1];
688  }
689  /* null last element */
690  task->copies[i] = nullptr;
691  task->data_count--;
692  }
693 
694 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
695 #warning "ttg::PaRSEC enables data copy tracking"
696  static std::unordered_set<ttg_data_copy_t *> pending_copies;
697  static std::mutex pending_copies_mutex;
698 #endif
699 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
700  static int64_t parsec_ttg_data_copy_uid = 0;
701 #endif
702 
703  template <typename Value>
704  inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
705  using value_type = std::decay_t<Value>;
706  ttg_data_copy_t *copy = nullptr;
707  if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
708  std::is_constructible_v<value_type, decltype(value)>) {
709  copy = new value_type(std::forward<Value>(value));
710  } else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>, decltype(value)>) {
711  copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
712  } else {
713  /* we have no way to create a new copy from this value */
714  throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
715  }
716 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
717  // Keep track of additional memory usage
718  if(ttg::default_execution_context().impl().profiling()) {
719  copy->size = sizeof(Value);
720  copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
721  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
722  static_cast<uint64_t>(copy->uid),
723  PROFILE_OBJECT_ID_NULL, &copy->size,
724  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
725  }
726 #endif
727 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
728  {
729  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
730  auto rc = pending_copies.insert(copy);
731  assert(std::get<1>(rc));
732  }
733 #endif
734  return copy;
735  }
736 
737 #if 0
738  template <std::size_t... IS, typename Key = keyT>
739  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
740  int junk[] = {0, (invoke_pull_terminal<IS>(
741  std::get<IS>(input_terminals), key, task),
742  0)...};
743  junk[0]++;
744  }
745 #endif // 0
746 
747  template<typename TT, std::size_t I>
748  inline void transfer_ownership_impl(ttg_data_copy_t *copy, int device) {
749  if constexpr(!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
750  copy->transfer_ownership(PARSEC_FLOW_ACCESS_RW, device);
751  copy->inc_current_version();
752  }
753  }
754 
755  template<typename TT, std::size_t... Is>
756  inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
757  /* transfer ownership of each data */
758  int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->copies[Is], device), 0)...};
759  junk[0]++;
760  }
761 
762  template<typename TT>
763  inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
764  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
765  if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
766  transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
767  }
768  return me->template invoke_op<ttg::ExecutionSpace::Host>();
769  }
770 
771  template<typename TT>
772  inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
773  if constexpr(TT::derived_has_cuda_op()) {
774  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
775  return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
776  } else {
777  std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
778  return PARSEC_HOOK_RETURN_ERROR;
779  }
780  }
781 
782  template<typename TT>
783  inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
784  if constexpr(TT::derived_has_hip_op()) {
785  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
786  return me->template invoke_op<ttg::ExecutionSpace::HIP>();
787  } else {
788  std::cerr << "HIP hook called without having a HIP op!" << std::endl;
789  return PARSEC_HOOK_RETURN_ERROR;
790  }
791  }
792 
793  template<typename TT>
794  inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
795  if constexpr(TT::derived_has_level_zero_op()) {
796  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
797  return me->template invoke_op<ttg::ExecutionSpace::L0>();
798  } else {
799  std::cerr << "L0 hook called without having a L0 op!" << std::endl;
800  return PARSEC_HOOK_RETURN_ERROR;
801  }
802  }
803 
804 
805  template<typename TT>
806  inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
807  if constexpr(TT::derived_has_cuda_op()) {
808  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
809  return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
810  } else {
811  return PARSEC_HOOK_RETURN_NEXT;
812  }
813  }
814 
815  template<typename TT>
816  inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
817  if constexpr(TT::derived_has_hip_op()) {
818  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
819  return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
820  } else {
821  return PARSEC_HOOK_RETURN_NEXT;
822  }
823  }
824 
825  template<typename TT>
826  inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
827  if constexpr(TT::derived_has_level_zero_op()) {
828  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
829  return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
830  } else {
831  return PARSEC_HOOK_RETURN_NEXT;
832  }
833  }
834 
835 
836  template <typename KeyT, typename ActivationCallbackT>
838  std::vector<KeyT> _keylist;
839  std::atomic<int> _outstanding_transfers;
840  ActivationCallbackT _cb;
842 
843  public:
844  rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
845  : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
846 
847  bool complete_transfer(void) {
848  int left = --_outstanding_transfers;
849  if (0 == left) {
850  _cb(std::move(_keylist), _copy);
851  return true;
852  }
853  return false;
854  }
855  };
856 
857  template <typename ActivationT>
858  static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
859  parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
860  void *cb_data) {
861  parsec_ce.mem_unregister(&lreg);
862  ActivationT *activation = static_cast<ActivationT *>(cb_data);
863  if (activation->complete_transfer()) {
864  delete activation;
865  }
866  return PARSEC_SUCCESS;
867  }
868 
869  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
870  int src, void *cb_data) {
871  std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
872  std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
873  (*fn)();
874  delete fn;
875  return PARSEC_SUCCESS;
876  }
877 
878  template <typename FuncT>
879  static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
880  int src, void *cb_data) {
881  std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
882  FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
883  (*fn_ptr)();
884  delete fn_ptr;
885  return PARSEC_SUCCESS;
886  }
887 
888  inline void release_data_copy(ttg_data_copy_t *copy) {
889  if (copy->is_mutable() && nullptr == copy->get_next_task()) {
890  /* current task mutated the data but there are no consumers so prepare
891  * the copy to be freed below */
892  copy->reset_readers();
893  }
894 
895  int32_t readers = copy->num_readers();
896  if (readers > 1) {
897  /* potentially more than one reader, decrement atomically */
898  readers = copy->decrement_readers();
899  } else if (readers == 1) {
900  /* make sure readers drop to zero */
901  readers = copy->decrement_readers<false>();
902  }
903  /* if there was only one reader (the current task) or
904  * a mutable copy and a successor, we release the copy */
905  if (1 == readers || readers == copy->mutable_tag) {
906  std::atomic_thread_fence(std::memory_order_acquire);
907  if (nullptr != copy->get_next_task()) {
908  /* Release the deferred task.
909  * The copy was mutable and will be mutated by the released task,
910  * so simply transfer ownership.
911  */
912  parsec_task_t *next_task = copy->get_next_task();
913  copy->set_next_task(nullptr);
914  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
915  copy->mark_mutable();
916  deferred_op->release_task();
917  } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
918  /* we are the last reference, delete the copy */
919 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
920  {
921  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
922  size_t rc = pending_copies.erase(copy);
923  assert(1 == rc);
924  }
925 #endif
926 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
927  // Keep track of additional memory usage
928  if(ttg::default_execution_context().impl().profiling()) {
929  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
930  static_cast<uint64_t>(copy->uid),
931  PROFILE_OBJECT_ID_NULL, &copy->size,
932  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
933  }
934 #endif
935  delete copy;
936  }
937  }
938  }
939 
940  template <typename Value>
942  ttg_data_copy_t *copy_res = copy_in;
943  bool replace = false;
944  int32_t readers = copy_in->num_readers();
945  assert(readers != 0);
946 
947  /* try hard to defer writers if we cannot make copies
948  * if deferral fails we have to bail out */
949  bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->defer_writer;
950 
951  if (readonly && !copy_in->is_mutable()) {
952  /* simply increment the number of readers */
953  readers = copy_in->increment_readers();
954  }
955 
956  if (readers == copy_in->mutable_tag) {
957  if (copy_res->get_next_task() != nullptr) {
958  if (readonly) {
959  parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
960  if (next_task->defer_writer) {
961  /* there is a writer but it signalled that it wants to wait for readers to complete */
962  return copy_res;
963  }
964  }
965  }
966  /* someone is going to write into this copy -> we need to make a copy */
967  copy_res = NULL;
968  if (readonly) {
969  /* we replace the copy in a deferred task if the copy will be mutated by
970  * the deferred task and we are readonly.
971  * That way, we can share the copy with other readonly tasks and release
972  * the deferred task. */
973  replace = true;
974  }
975  } else if (!readonly) {
976  /* this task will mutate the data
977  * check whether there are other readers already and potentially
978  * defer the release of this task to give following readers a
979  * chance to make a copy of the data before this task mutates it
980  *
981  * Try to replace the readers with a negative value that indicates
982  * the value is mutable. If that fails we know that there are other
983  * readers or writers already.
984  *
985  * NOTE: this check is not atomic: either there is a single reader
986  * (current task) or there are others, in which we case won't
987  * touch it.
988  */
989  if (1 == copy_in->num_readers() && !defer_writer) {
994  assert(nullptr == copy_in->get_next_task());
995  copy_in->set_next_task(&task->parsec_task);
996  std::atomic_thread_fence(std::memory_order_release);
997  copy_in->mark_mutable();
998  } else {
999  if (defer_writer && nullptr == copy_in->get_next_task()) {
1000  /* we're the first writer and want to wait for all readers to complete */
1001  copy_res->set_next_task(&task->parsec_task);
1002  task->defer_writer = true;
1003  } else {
1004  /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
1005  copy_res = NULL;
1006  }
1007  }
1008  }
1009 
1010  if (NULL == copy_res) {
1011  // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1012  if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1013  ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1014  if (replace && nullptr != copy_in->get_next_task()) {
1015  /* replace the task that was deferred */
1016  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1017  new_copy->mark_mutable();
1018  /* replace the copy in the deferred task */
1019  for (int i = 0; i < deferred_op->data_count; ++i) {
1020  if (deferred_op->copies[i] == copy_in) {
1021  deferred_op->copies[i] = new_copy;
1022  break;
1023  }
1024  }
1025  copy_in->set_next_task(nullptr);
1026  deferred_op->release_task();
1027  copy_in->reset_readers(); // set the copy back to being read-only
1028  copy_in->increment_readers<false>(); // register as reader
1029  copy_res = copy_in; // return the copy we were passed
1030  } else {
1031  if (!readonly) {
1032  new_copy->mark_mutable();
1033  }
1034  copy_res = new_copy; // return the new copy
1035  }
1036  }
1037  else {
1038  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() + " but the type is not copyable");
1039  }
1040  }
1041  return copy_res;
1042  }
1043 
1044  } // namespace detail
1045 
1046  inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1047  if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1048 
1049  // make sure it's not already initialized
1050  int mpi_initialized;
1051  MPI_Initialized(&mpi_initialized);
1052  if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1053  int provided;
1054  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1055  if (!provided)
1056  throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1057  detail::initialized_mpi() = true;
1058  } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1059  }
1060 
1062  auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1063  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1064  ttg::World world{std::move(world_sptr)};
1065  ttg::detail::set_default_world(std::move(world));
1066 
1067  // query the first device ID
1069  for (int i = 0; i < parsec_nb_devices; ++i) {
1070  bool is_gpu = parsec_mca_device_is_gpu(i);
1071  if (detail::first_device_id == -1 && is_gpu) {
1073  } else if (detail::first_device_id > -1 && !is_gpu) {
1074  throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1075  }
1076  }
1077 
1078  /* parse the maximum inline size */
1079  const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1080  if (nullptr != ttg_max_inline_cstr) {
1081  std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1082  if (inline_size < detail::max_inline_size) {
1083  detail::max_inline_size = inline_size;
1084  }
1085  }
1086 
1087  bool all_peer_access = true;
1088  /* check whether all GPUs can access all peer GPUs */
1089  for (int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1090  parsec_device_module_t *idevice = parsec_mca_device_get(i);
1091  if (PARSEC_DEV_IS_GPU(idevice->type)) {
1092  parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1093  for (int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1094  if (i != j) { // because no device can access itself, says PaRSEC
1095  parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1096  if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1097  all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true : false;
1098  }
1099  }
1100  }
1101  }
1102  }
1103  detail::all_devices_peer_access = all_peer_access;
1104  }
1105  inline void ttg_finalize() {
1106  // We need to notify the current taskpool of termination if we are in user termination detection mode
1107  // or the parsec_context_wait() in destroy_worlds() will never complete
1109  ttg::default_execution_context().impl().final_task();
1110  ttg::detail::set_default_world(ttg::World{}); // reset the default world
1112  ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1113  if (detail::initialized_mpi()) MPI_Finalize();
1114  }
1116  [[noreturn]]
1117  inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1118  inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1119  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1120 
1121  template <typename T>
1122  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1123  world.impl().register_ptr(ptr);
1124  }
1125 
1126  template <typename T>
1127  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1128  world.impl().register_ptr(std::move(ptr));
1129  }
1130 
1131  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1132  world.impl().register_status(status_ptr);
1133  }
1134 
1135  template <typename Callback>
1136  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1137  world.impl().register_callback(std::forward<Callback>(callback));
1138  }
1139 
1140  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1141 
1142  inline void ttg_sum(ttg::World world, double &value) {
1143  double result = 0.0;
1144  MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1145  value = result;
1146  }
1147 
1148  inline void make_executable_hook(ttg::World& world) {
1149  MPI_Barrier(world.impl().comm());
1150  }
1151 
1154  template <typename T>
1155  void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1156  int64_t BUFLEN;
1157  if (world.rank() == source_rank) {
1159  }
1160  MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1161 
1162  unsigned char *buf = new unsigned char[BUFLEN];
1163  if (world.rank() == source_rank) {
1165  }
1166  MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1167  if (world.rank() != source_rank) {
1169  }
1170  delete[] buf;
1171  }
1172 
1173  namespace detail {
1174 
1175  struct ParsecTTBase {
1176  protected:
1177  // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1178  parsec_hash_table_t tasks_table;
1179  parsec_hash_table_t task_constraint_table;
1180  parsec_task_class_t self;
1181  };
1182 
1183  } // namespace detail
1184 
1185  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
1187  private:
1189  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1190  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1191  // create a virtual control input if the input list is empty, to be used in invoke()
1192  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1194  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1195  static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1196  "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1197  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1198  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1199 
1200  parsec_mempool_t mempools;
1201 
1202  // check for a non-type member named have_cuda_op
1203  template <typename T>
1204  using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1205 
1206  template <typename T>
1207  using have_hip_op_non_type_t = decltype(T::have_hip_op);
1208 
1209  template <typename T>
1210  using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1211 
1212  bool alive = true;
1213 
1214  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1215  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1216  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1217  static constexpr int numflows = std::max(numins, numouts); // max number of flows
1218 
1219  public:
1221  static constexpr bool derived_has_cuda_op() {
1222  return Space == ttg::ExecutionSpace::CUDA;
1223  }
1224 
1226  static constexpr bool derived_has_hip_op() {
1227  return Space == ttg::ExecutionSpace::HIP;
1228  }
1229 
1231  static constexpr bool derived_has_level_zero_op() {
1232  return Space == ttg::ExecutionSpace::L0;
1233  }
1234 
1236  static constexpr bool derived_has_device_op() {
1238  }
1239 
1240  using ttT = TT;
1241  using key_type = keyT;
1243  using input_args_type = actual_input_tuple_type;
1245  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1247  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1249  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1250  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
1251  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
1252 
1253  static constexpr int numinvals =
1254  std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1255  // input, if any)
1256 
1257  using output_terminals_type = output_terminalsT;
1259 
1260  template <std::size_t i, typename resultT, typename InTuple>
1261  static resultT get(InTuple &&intuple) {
1262  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1263  };
1264  template <std::size_t i, typename InTuple>
1265  static auto &get(InTuple &&intuple) {
1266  return std::get<i>(std::forward<InTuple>(intuple));
1267  };
1268 
1269  private:
1270  using task_t = detail::parsec_ttg_task_t<ttT>;
1271 
1273  friend task_t;
1274 
1275  /* the offset of the key placed after the task structure in the memory from mempool */
1276  constexpr static const size_t task_key_offset = sizeof(task_t);
1277 
1278  input_terminals_type input_terminals;
1279  output_terminalsT output_terminals;
1280 
1281  protected:
1282  const auto &get_output_terminals() const { return output_terminals; }
1283 
1284  private:
1285  template <std::size_t... IS>
1286  static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1287  using resultT = decltype(set_arg_from_msg_fcts);
1288  return resultT{{&TT::set_arg_from_msg<IS>...}};
1289  }
1290  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1291  make_set_args_fcts(std::make_index_sequence<numins>{});
1292 
1293  template <std::size_t... IS>
1294  static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1295  using resultT = decltype(set_argstream_size_from_msg_fcts);
1296  return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1297  }
1298  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1299  make_set_size_fcts(std::make_index_sequence<numins>{});
1300 
1301  template <std::size_t... IS>
1302  static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1303  using resultT = decltype(finalize_argstream_from_msg_fcts);
1304  return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1305  }
1306  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1307  make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1308 
1309  template <std::size_t... IS>
1310  static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1311  using resultT = decltype(get_from_pull_msg_fcts);
1312  return resultT{{&TT::get_from_pull_msg<IS>...}};
1313  }
1314  constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1315  make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1316 
1317  template<std::size_t... IS>
1318  constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1319  using resultT = decltype(input_is_const);
1320  return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1321  }
1322  constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1323 
1324  ttg::World world;
1325  ttg::meta::detail::keymap_t<keyT> keymap;
1326  ttg::meta::detail::keymap_t<keyT> priomap;
1327  ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1328  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1329  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1330  input_reducers;
1331  std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1332  std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1333  int num_pullins = 0;
1334 
1335  bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1336 
1337  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1338  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1339 
1340  public:
1341  ttg::World get_world() const override final { return world; }
1342 
1343  private:
1347  template <typename... Args>
1348  auto op(Args &&...args) {
1349  derivedT *derived = static_cast<derivedT *>(this);
1350  using return_type = decltype(derived->op(std::forward<Args>(args)...));
1351  if constexpr (std::is_same_v<return_type,void>) {
1352  derived->op(std::forward<Args>(args)...);
1353  return;
1354  }
1355  else {
1356  return derived->op(std::forward<Args>(args)...);
1357  }
1358  }
1359 
1360  template <std::size_t i, typename terminalT, typename Key>
1361  void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1362  if (in.is_pull_terminal) {
1363  auto owner = in.container.owner(key);
1364  if (owner != world.rank()) {
1365  get_pull_terminal_data_from<i>(owner, key);
1366  } else {
1367  // push the data to the task
1368  set_arg<i>(key, (in.container).get(key));
1369  }
1370  }
1371  }
1372 
1373  template <std::size_t i, typename Key>
1374  void get_pull_terminal_data_from(const int owner,
1375  const Key &key) {
1376  using msg_t = detail::msg_t;
1377  auto &world_impl = world.impl();
1378  parsec_taskpool_t *tp = world_impl.taskpool();
1379  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1381  world.rank(), 1);
1382  /* pack the key */
1383  size_t pos = 0;
1384  pos = pack(key, msg->bytes, pos);
1385  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1386  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1387  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1388  sizeof(msg_header_t) + pos);
1389  }
1390 
1391  template <std::size_t... IS, typename Key = keyT>
1392  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1393  int junk[] = {0, (invoke_pull_terminal<IS>(
1394  std::get<IS>(input_terminals), key, task),
1395  0)...};
1396  junk[0]++;
1397  }
1398 
1399  template <std::size_t... IS>
1400  static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1401  return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1402  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1403  task->copies[IS]->get_ptr()))...};
1404  }
1405 
1406 #ifdef TTG_HAVE_DEVICE
1410  static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1411  parsec_gpu_task_t *gpu_task,
1412  parsec_gpu_exec_stream_t *gpu_stream) {
1413 
1414  task_t *task = (task_t*)gpu_task->ec;
1415  // get the device task from the coroutine handle
1416  ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1417 
1418  task->dev_ptr->stream = gpu_stream;
1419 
1420  //std::cout << "device_static_submit task " << task << std::endl;
1421 
1422  // get the promise which contains the views
1423  auto dev_data = dev_task.promise();
1424 
1425  /* we should still be waiting for the transfer to complete */
1426  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1427  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1428 
1429 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1430  {
1431  parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1432  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1433  ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1434  }
1435 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1436  {
1437  parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1438  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1439  ttg::device::detail::set_current(device, hip_stream->hip_stream);
1440  }
1441 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1442  {
1443  parsec_level_zero_exec_stream_t *stream;
1444  stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1445  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1446  ttg::device::detail::set_current(device, stream->swq->queue);
1447  }
1448 #endif // defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1449 
1450  /* Here we call back into the coroutine again after the transfers have completed */
1451  static_op(&task->parsec_task);
1452 
1454 
1455  auto discard_tmp_flows = [&](){
1456  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1457  if (gpu_task->flow[i]->flow_flags & TTG_PARSEC_FLOW_ACCESS_TMP) {
1458  /* temporary flow, discard by setting it to read-only to avoid evictions */
1459  const_cast<parsec_flow_t*>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1460  task->parsec_task.data[i].data_out->readers = 1;
1461  }
1462  }
1463  };
1464 
1465  /* we will come back into this function once the kernel and transfers are done */
1466  int rc = PARSEC_HOOK_RETURN_DONE;
1467  if (nullptr != task->suspended_task_address) {
1468  /* Get a new handle for the promise*/
1469  dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1470  dev_data = dev_task.promise();
1471 
1472  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1473  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1474  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1475 
1476  if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1477  ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1478  /* the task started sending so we won't come back here */
1479  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1480  discard_tmp_flows();
1481  } else {
1482  //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1483  rc = PARSEC_HOOK_RETURN_AGAIN;
1484  }
1485  } else {
1486  /* the task is done so we won't come back here */
1487  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1488  discard_tmp_flows();
1489  }
1490  return rc;
1491  }
1492 
1493  static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1494 
1495  task_t *task = (task_t*)parsec_task;
1496  if (task->dev_ptr->gpu_task == nullptr) {
1497 
1498  /* set up a device task */
1499  parsec_gpu_task_t *gpu_task;
1500  /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1501  gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1502  PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1503  gpu_task->ec = parsec_task;
1504  gpu_task->task_type = 0; // user task
1505  gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max(); // used internally
1506  gpu_task->pushout = 0;
1507  gpu_task->submit = &TT::device_static_submit;
1508 
1509  // one way to force the task device
1510  // currently this will probably break all of PaRSEC if this hint
1511  // does not match where the data is located, not really useful for us
1512  // instead we set a hint on the data if there is no hint set yet
1513  //parsec_task->selected_device = ...;
1514 
1515  /* set the gpu_task so it's available in register_device_memory */
1516  task->dev_ptr->gpu_task = gpu_task;
1517 
1518  /* TODO: is this the right place to set the mask? */
1519  task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1520 
1521  /* copy over the task class, because that's what we need */
1522  task->dev_ptr->task_class = *task->parsec_task.task_class;
1523 
1524  // first invocation of the coroutine to get the coroutine handle
1525  static_op(parsec_task);
1526 
1527  /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1528 
1529  parsec_task_class_t& tc = task->dev_ptr->task_class;
1530 
1531  // input flows are set up during register_device_memory as part of the first invocation above
1532  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1533  tc.in[i] = gpu_task->flow[i];
1534  tc.out[i] = gpu_task->flow[i];
1535  }
1536  tc.nb_flows = MAX_PARAM_COUNT;
1537 
1538  /* set the device hint on the data */
1539  TT *tt = task->tt;
1540  if (tt->devicemap) {
1541  int parsec_dev;
1542  if constexpr (std::is_void_v<keyT>) {
1543  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap());
1544  } else {
1545  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap(task->key));
1546  }
1547  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1548  /* only set on mutable data since we have exclusive access */
1549  if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1550  parsec_data_t *data = parsec_task->data[i].data_in->original;
1551  /* only set the preferred device if the host has the latest copy
1552  * as otherwise we may end up with the wrong data if there is a newer
1553  * version on a different device. Also, keep fingers crossed. */
1554  if (data->owner_device == 0) {
1555  parsec_advise_data_on_device(data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1556  }
1557  }
1558  }
1559  }
1560 
1561  /* set the new task class that contains the flows */
1562  task->parsec_task.task_class = &task->dev_ptr->task_class;
1563 
1564  /* select this one */
1565  return PARSEC_HOOK_RETURN_DONE;
1566  }
1567 
1568  std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1569 
1570  /* not sure if this might happen*/
1571  return PARSEC_HOOK_RETURN_ERROR;
1572 
1573  }
1574 
1575  static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1576  static_assert(derived_has_device_op());
1577 
1578  /* when we come in here we have a device assigned and are ready to go */
1579 
1580  task_t *task = (task_t*)parsec_task;
1581 
1582  if (nullptr == task->suspended_task_address) {
1583  /* short-cut in case the task returned immediately */
1584  return PARSEC_HOOK_RETURN_DONE;
1585  }
1586 
1587  // get the device task from the coroutine handle
1588  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1589 
1590  // get the promise which contains the views
1591  ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1592 
1593  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1594  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1595  /* the task jumped directly to send or returned so we're done */
1596  return PARSEC_HOOK_RETURN_DONE;
1597  }
1598 
1599  parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1600  assert(NULL != device);
1601 
1602  task->dev_ptr->device = device;
1603  parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1604  parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1605 
1606  switch(device->super.type) {
1607 
1608 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1609  case PARSEC_DEV_CUDA:
1610  if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1611  /* TODO: we need custom staging functions because PaRSEC looks at the
1612  * task-class to determine the number of flows. */
1613  gpu_task->stage_in = parsec_default_gpu_stage_in;
1614  gpu_task->stage_out = parsec_default_gpu_stage_out;
1615  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1616  }
1617  break;
1618 #endif
1619 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1620  case PARSEC_DEV_HIP:
1621  if constexpr (Space == ttg::ExecutionSpace::HIP) {
1622  gpu_task->stage_in = parsec_default_gpu_stage_in;
1623  gpu_task->stage_out = parsec_default_gpu_stage_out;
1624  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1625  }
1626  break;
1627 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1628 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1629  case PARSEC_DEV_LEVEL_ZERO:
1630  if constexpr (Space == ttg::ExecutionSpace::L0) {
1631  gpu_task->stage_in = parsec_default_gpu_stage_in;
1632  gpu_task->stage_out = parsec_default_gpu_stage_out;
1633  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1634  }
1635  break;
1636 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1637  default:
1638  break;
1639  }
1640  ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1641  ttg::abort();
1642  return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1643  }
1644 #endif // TTG_HAVE_DEVICE
1645 
1646  static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1647 
1648  task_t *task = (task_t*)parsec_task;
1649  void* suspended_task_address =
1650 #ifdef TTG_HAVE_COROUTINE
1651  task->suspended_task_address; // non-null = need to resume the task
1652 #else // TTG_HAVE_COROUTINE
1653  nullptr;
1654 #endif // TTG_HAVE_COROUTINE
1655  //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1656  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1657 
1658  ttT *baseobj = task->tt;
1659  derivedT *obj = static_cast<derivedT *>(baseobj);
1660  assert(detail::parsec_ttg_caller == nullptr);
1661  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1662  if (obj->tracing()) {
1663  if constexpr (!ttg::meta::is_void_v<keyT>)
1664  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1665  else
1666  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1667  }
1668 
1669  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1670  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1671  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1672  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1673  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1674  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1675  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1676  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1677  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1678  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1679  } else {
1680  ttg::abort();
1681  }
1682  detail::parsec_ttg_caller = nullptr;
1683  }
1684  else { // resume the suspended coroutine
1685 
1686 #ifdef TTG_HAVE_COROUTINE
1687  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1688 
1689 #ifdef TTG_HAVE_DEVICE
1690  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1691  ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1692  assert(detail::parsec_ttg_caller == nullptr);
1693  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1694  // TODO: unify the outputs tls handling
1695  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1696  task->tt->set_outputs_tls_ptr();
1697  coro.resume();
1698  if (coro.completed()) {
1699  coro.destroy();
1700  suspended_task_address = nullptr;
1701  }
1702  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1703  detail::parsec_ttg_caller = nullptr;
1704  } else
1705 #endif // TTG_HAVE_DEVICE
1706  if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1707  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1708  assert(ret.ready());
1709  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1710  task->tt->set_outputs_tls_ptr();
1711  ret.resume();
1712  if (ret.completed()) {
1713  ret.destroy();
1714  suspended_task_address = nullptr;
1715  }
1716  else { // not yet completed
1717  // leave suspended_task_address as is
1718 
1719  // right now can events are not properly implemented, we are only testing the workflow with dummy events
1720  // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1721  auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1722  for (auto &event_ptr : events) {
1723  event_ptr->finish();
1724  }
1725  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
1726  }
1727  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1728  detail::parsec_ttg_caller = nullptr;
1729  task->suspended_task_address = suspended_task_address;
1730  }
1731  else
1732  ttg::abort(); // unrecognized task id
1733 #else // TTG_HAVE_COROUTINE
1734  ttg::abort(); // should not happen
1735 #endif // TTG_HAVE_COROUTINE
1736  }
1737 #ifdef TTG_HAVE_COROUTINE
1738  task->suspended_task_address = suspended_task_address;
1739 #endif // TTG_HAVE_COROUTINE
1740  if (suspended_task_address == nullptr) {
1741  ttT *baseobj = task->tt;
1742  derivedT *obj = static_cast<derivedT *>(baseobj);
1743  if (obj->tracing()) {
1744  if constexpr (!ttg::meta::is_void_v<keyT>)
1745  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1746  else
1747  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1748  }
1749  }
1750 
1751  return PARSEC_HOOK_RETURN_DONE;
1752  }
1753 
1754  static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1755  task_t *task = static_cast<task_t*>(parsec_task);
1756 
1757  void* suspended_task_address =
1758 #ifdef TTG_HAVE_COROUTINE
1759  task->suspended_task_address; // non-null = need to resume the task
1760 #else // TTG_HAVE_COROUTINE
1761  nullptr;
1762 #endif // TTG_HAVE_COROUTINE
1763  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1764  ttT *baseobj = (ttT *)task->object_ptr;
1765  derivedT *obj = (derivedT *)task->object_ptr;
1766  assert(detail::parsec_ttg_caller == NULL);
1768  if constexpr (!ttg::meta::is_void_v<keyT>) {
1769  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1770  } else if constexpr (ttg::meta::is_void_v<keyT>) {
1771  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1772  } else // unreachable
1773  ttg:: abort();
1775  }
1776  else {
1777 #ifdef TTG_HAVE_COROUTINE
1778  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1779  assert(ret.ready());
1780  ret.resume();
1781  if (ret.completed()) {
1782  ret.destroy();
1783  suspended_task_address = nullptr;
1784  }
1785  else { // not yet completed
1786  // leave suspended_task_address as is
1787  }
1788 #else // TTG_HAVE_COROUTINE
1789  ttg::abort(); // should not happen
1790 #endif // TTG_HAVE_COROUTINE
1791  }
1792  task->suspended_task_address = suspended_task_address;
1793 
1794  if (suspended_task_address) {
1795  ttg::abort(); // not yet implemented
1796  // see comments in static_op()
1797  return PARSEC_HOOK_RETURN_AGAIN;
1798  }
1799  else
1800  return PARSEC_HOOK_RETURN_DONE;
1801  }
1802 
1803  template <std::size_t i>
1804  static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1805  using rtask_t = detail::reducer_task_t;
1806  using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1807  constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1808  constexpr const bool input_is_const = std::is_const_v<value_t>;
1809  rtask_t *rtask = (rtask_t*)parsec_task;
1810  task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1811  ttT *baseobj = parent_task->tt;
1812  derivedT *obj = static_cast<derivedT *>(baseobj);
1813 
1814  auto& reducer = std::get<i>(baseobj->input_reducers);
1815 
1816  //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1817 
1818  if (obj->tracing()) {
1819  if constexpr (!ttg::meta::is_void_v<keyT>)
1820  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1821  else
1822  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1823  }
1824 
1825  /* the copy to reduce into */
1826  detail::ttg_data_copy_t *target_copy;
1827  target_copy = parent_task->copies[i];
1828  assert(val_is_void || nullptr != target_copy);
1829  /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1830  std::size_t c = 0;
1831  std::size_t size = 0;
1832  assert(parent_task->streams[i].reduce_count > 0);
1833  if (rtask->is_first) {
1834  if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1835  /* we were the first and there is nothing to be done */
1836  if (obj->tracing()) {
1837  if constexpr (!ttg::meta::is_void_v<keyT>)
1838  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1839  else
1840  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1841  }
1842 
1843  return PARSEC_HOOK_RETURN_DONE;
1844  }
1845  }
1846 
1847  assert(detail::parsec_ttg_caller == NULL);
1848  detail::parsec_ttg_caller = rtask->parent_task;
1849 
1850  do {
1851  if constexpr(!val_is_void) {
1852  /* the copies to reduce out of */
1853  detail::ttg_data_copy_t *source_copy;
1854  parsec_list_item_t *item;
1855  item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1856  if (nullptr == item) {
1857  // maybe someone is changing the goal right now
1858  break;
1859  }
1860  source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1861  assert(target_copy->num_readers() == target_copy->mutable_tag);
1862  assert(source_copy->num_readers() > 0);
1863  reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1864  *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1865  detail::release_data_copy(source_copy);
1866  } else if constexpr(val_is_void) {
1867  reducer(); // invoke control reducer
1868  }
1869  // there is only one task working on this stream, so no need to be atomic here
1870  size = ++parent_task->streams[i].size;
1871  //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1872  } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1873  //} while ((c = (--task->streams[i].reduce_count)) > 0);
1874 
1875  /* finalize_argstream sets goal to 1, so size may be larger than goal */
1876  bool complete = (size >= parent_task->streams[i].goal);
1877 
1878  //std::cout << "static_reducer_op size " << size
1879  // << " of " << parent_task->streams[i].goal << " complete " << complete
1880  // << " c " << c << std::endl;
1881  if (complete && c == 0) {
1882  if constexpr(input_is_const) {
1883  /* make the consumer task a reader if its input is const */
1884  target_copy->reset_readers();
1885  }
1886  /* task may not be runnable yet because other inputs are missing, have release_task decide */
1887  parent_task->remove_from_hash = true;
1888  parent_task->release_task(parent_task);
1889  }
1890 
1892 
1893  if (obj->tracing()) {
1894  if constexpr (!ttg::meta::is_void_v<keyT>)
1895  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1896  else
1897  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1898  }
1899 
1900  return PARSEC_HOOK_RETURN_DONE;
1901  }
1902 
1903 
1904  protected:
1905  template <typename T>
1906  uint64_t unpack(T &obj, void *_bytes, uint64_t pos) {
1908  uint64_t payload_size;
1909  if constexpr (!dd_t::serialize_size_is_const) {
1910  pos = ttg::default_data_descriptor<uint64_t>::unpack_payload(&payload_size, sizeof(uint64_t), pos, _bytes);
1911  } else {
1912  payload_size = dd_t::payload_size(&obj);
1913  }
1914  pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1915  return pos;
1916  }
1917 
1918  template <typename T>
1919  uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1921  uint64_t payload_size = dd_t::payload_size(&obj);
1922  if (copy) {
1923  /* reset any tracked data, we don't care about the packing from the payload size */
1924  copy->iovec_reset();
1925  }
1926 
1927  if constexpr (!dd_t::serialize_size_is_const) {
1928  pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1929  }
1930  pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1931  return pos;
1932  }
1933 
1934  static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1935  assert(size >= sizeof(msg_header_t) &&
1936  "Trying to unpack as message that does not hold enough bytes to represent a single header");
1937  msg_header_t *hd = static_cast<msg_header_t *>(data);
1938  derivedT *obj = reinterpret_cast<derivedT *>(bop);
1939  switch (hd->fn_id) {
1941  if (0 <= hd->param_id) {
1942  assert(hd->param_id >= 0);
1943  assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1944  auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1945  (obj->*member)(data, size);
1946  } else {
1947  // there is no good reason to have negative param ids
1948  ttg::abort();
1949  }
1950  break;
1951  }
1953  assert(hd->param_id >= 0);
1954  assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
1955  auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
1956  (obj->*member)(data, size);
1957  break;
1958  }
1960  assert(hd->param_id >= 0);
1961  assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
1962  auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
1963  (obj->*member)(data, size);
1964  break;
1965  }
1967  assert(hd->param_id >= 0);
1968  assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
1969  auto member = obj->get_from_pull_msg_fcts[hd->param_id];
1970  (obj->*member)(data, size);
1971  break;
1972  }
1973  default:
1974  ttg::abort();
1975  }
1976  }
1977 
1979  inline parsec_thread_mempool_t *get_task_mempool(void) {
1980  auto &world_impl = world.impl();
1981  parsec_execution_stream_s *es = world_impl.execution_stream();
1982  int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1983  return &mempools.thread_mempools[index];
1984  }
1985 
1986  template <size_t i, typename valueT>
1987  void set_arg_from_msg_keylist(ttg::span<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
1988  /* create a dummy task that holds the copy, which can be reused by others */
1989  task_t *dummy;
1990  parsec_execution_stream_s *es = world.impl().execution_stream();
1991  parsec_thread_mempool_t *mempool = get_task_mempool();
1992  dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self, this);
1993  dummy->set_dummy(true);
1994  // TODO: do we need to copy static_stream_goal in dummy?
1995 
1996  /* set the received value as the dummy's only data */
1997  dummy->copies[0] = copy;
1998 
1999  /* We received the task on this world, so it's using the same taskpool */
2000  dummy->parsec_task.taskpool = world.impl().taskpool();
2001 
2002  /* save the current task and set the dummy task */
2003  auto parsec_ttg_caller_save = detail::parsec_ttg_caller;
2004  detail::parsec_ttg_caller = dummy;
2005 
2006  /* iterate over the keys and have them use the copy we made */
2007  parsec_task_t *task_ring = nullptr;
2008  for (auto &&key : keylist) {
2009  // copy-constructible? can broadcast to any number of keys
2010  if constexpr (std::is_copy_constructible_v<valueT>) {
2011  set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2012  }
2013  else {
2014  // not copy-constructible? can move, but only to single key
2015  static_assert(!std::is_reference_v<valueT>);
2016  if (std::size(keylist) == 1)
2017  set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2018  else {
2019  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() + " but the type is not copyable");
2020  }
2021  }
2022  }
2023 
2024  if (nullptr != task_ring) {
2025  auto &world_impl = world.impl();
2026  parsec_task_t *vp_task_ring[1] = { task_ring };
2027  __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2028  }
2029 
2030  /* restore the previous task */
2031  detail::parsec_ttg_caller = parsec_ttg_caller_save;
2032 
2033  /* release the dummy task */
2034  complete_task_and_release(es, &dummy->parsec_task);
2035  parsec_thread_mempool_free(mempool, &dummy->parsec_task);
2036  }
2037 
2038  // there are 6 types of set_arg:
2039  // - case 1: nonvoid Key, complete Value type
2040  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2041  // - case 3: nonvoid Key, void Value, no inputs
2042  // - case 4: void Key, complete Value type
2043  // - case 5: void Key, void Value, mixed (data+control) inputs
2044  // - case 6: void Key, void Value, no inputs
2045  // implementation of these will be further split into "local-only" and global+local
2046 
2047  template <std::size_t i>
2048  void set_arg_from_msg(void *data, std::size_t size) {
2049  using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2050  using msg_t = detail::msg_t;
2051  msg_t *msg = static_cast<msg_t *>(data);
2052  if constexpr (!ttg::meta::is_void_v<keyT>) {
2053  /* unpack the keys */
2054  /* TODO: can we avoid copying all the keys?! */
2055  uint64_t pos = msg->tt_id.key_offset;
2056  uint64_t key_end_pos;
2057  std::vector<keyT> keylist;
2058  int num_keys = msg->tt_id.num_keys;
2059  keylist.reserve(num_keys);
2060  auto rank = world.rank();
2061  for (int k = 0; k < num_keys; ++k) {
2062  keyT key;
2063  pos = unpack(key, msg->bytes, pos);
2064  assert(keymap(key) == rank);
2065  keylist.push_back(std::move(key));
2066  }
2067  key_end_pos = pos;
2068  /* jump back to the beginning of the message to get the value */
2069  pos = 0;
2070  // case 1
2071  if constexpr (!ttg::meta::is_void_v<valueT>) {
2072  using decvalueT = std::decay_t<valueT>;
2073  int32_t num_iovecs = msg->tt_id.num_iovecs;
2074  //bool inline_data = msg->inline_data;
2078  using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2079 
2080  /* unpack the metadata */
2081  metadata_t metadata;
2082  pos = unpack(metadata, msg->bytes, pos);
2083 
2084  //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2085 
2086  copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2087  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2088  copy = detail::create_new_datacopy(decvalueT{});
2089 #if 0
2090  // TODO: first attempt at sending directly to the device
2091  parsec_gpu_data_copy_t* gpu_elem;
2092  gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2093  int i = detail::first_device_id;
2094  int devid = detail::first_device_id;
2095  while (i < parsec_nb_devices) {
2096  if (nullptr == gpu_elem) {
2097  gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2098  gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2099  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2100  gpu_elem->version = 0;
2101  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2102  }
2103  if (nullptr == gpu_elem->device_private) {
2104  gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2105  if (nullptr == gpu_elem->device_private) {
2106  devid++;
2107  continue;
2108  }
2109  break;
2110  }
2111  }
2112 #endif // 0
2113  /* unpack the object, potentially discovering iovecs */
2114  pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2115  //std::cout << "set_arg_from_msg iovec_begin num_iovecs " << num_iovecs << " distance " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2116  assert(std::distance(copy->iovec_begin(), copy->iovec_end()) == num_iovecs);
2117  }
2118 
2119  if (num_iovecs == 0) {
2120  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2121  } else {
2122  /* unpack the header and start the RMA transfers */
2123 
2124  /* get the remote rank */
2125  int remote = msg->tt_id.sender;
2126  assert(remote < world.size());
2127 
2128  auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2129 
2130  bool inline_data = msg->tt_id.inline_data;
2131 
2132  int nv = 0;
2133  /* start the RMA transfers */
2134  auto handle_iovecs_fn =
2135  [&](auto&& iovecs) {
2136 
2137  if (inline_data) {
2138  /* unpack the data from the message */
2139  for (auto &&iov : iovecs) {
2140  ++nv;
2141  std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2142  pos += iov.num_bytes;
2143  }
2144  } else {
2145  /* extract the callback tag */
2146  parsec_ce_tag_t cbtag;
2147  std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2148  pos += sizeof(cbtag);
2149 
2150  /* create the value from the metadata */
2151  auto activation = new detail::rma_delayed_activate(
2152  std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2153  set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2154  this->world.impl().decrement_inflight_msg();
2155  });
2156 
2157  using ActivationT = std::decay_t<decltype(*activation)>;
2158 
2159  for (auto &&iov : iovecs) {
2160  ++nv;
2161  parsec_ce_mem_reg_handle_t rreg;
2162  int32_t rreg_size_i;
2163  std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2164  pos += sizeof(rreg_size_i);
2165  rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2166  pos += rreg_size_i;
2167  // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2168  // pos += sizeof(*fn_ptr);
2169  std::intptr_t fn_ptr;
2170  std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2171  pos += sizeof(fn_ptr);
2172 
2173  /* register the local memory */
2174  parsec_ce_mem_reg_handle_t lreg;
2175  size_t lreg_size;
2176  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2177  iov.num_bytes, &lreg, &lreg_size);
2178  world.impl().increment_inflight_msg();
2179  /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2180  //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2181  parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2182  &detail::get_complete_cb<ActivationT>, activation,
2183  /*world.impl().parsec_ttg_rma_tag()*/
2184  cbtag, &fn_ptr, sizeof(std::intptr_t));
2185  }
2186  }
2187  };
2190  handle_iovecs_fn(descr.get_data(val));
2191  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2192  handle_iovecs_fn(copy->iovec_span());
2193  copy->iovec_reset();
2194  }
2195 
2196  assert(num_iovecs == nv);
2197  assert(size == (key_end_pos + sizeof(msg_header_t)));
2198 
2199  if (inline_data) {
2200  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2201  }
2202  }
2203  // case 2 and 3
2204  } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2205  for (auto &&key : keylist) {
2206  set_arg<i, keyT, ttg::Void>(key, ttg::Void{});
2207  }
2208  }
2209  // case 4
2210  } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2211  using decvalueT = std::decay_t<valueT>;
2212  decvalueT val;
2213  /* TODO: handle split-metadata case as with non-void keys */
2214  unpack(val, msg->bytes, 0);
2215  set_arg<i, keyT, valueT>(std::move(val));
2216  // case 5 and 6
2217  } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2218  set_arg<i, keyT, ttg::Void>(ttg::Void{});
2219  } else { // unreachable
2220  ttg::abort();
2221  }
2222  }
2223 
2224  template <std::size_t i>
2225  void finalize_argstream_from_msg(void *data, std::size_t size) {
2226  using msg_t = detail::msg_t;
2227  msg_t *msg = static_cast<msg_t *>(data);
2228  if constexpr (!ttg::meta::is_void_v<keyT>) {
2229  /* unpack the key */
2230  uint64_t pos = 0;
2231  auto rank = world.rank();
2232  keyT key;
2233  pos = unpack(key, msg->bytes, pos);
2234  assert(keymap(key) == rank);
2235  finalize_argstream<i>(key);
2236  } else {
2237  auto rank = world.rank();
2238  assert(keymap() == rank);
2239  finalize_argstream<i>();
2240  }
2241  }
2242 
2243  template <std::size_t i>
2244  void argstream_set_size_from_msg(void *data, std::size_t size) {
2245  using msg_t = detail::msg_t;
2246  auto msg = static_cast<msg_t *>(data);
2247  uint64_t pos = 0;
2248  if constexpr (!ttg::meta::is_void_v<keyT>) {
2249  /* unpack the key */
2250  auto rank = world.rank();
2251  keyT key;
2252  pos = unpack(key, msg->bytes, pos);
2253  assert(keymap(key) == rank);
2254  std::size_t argstream_size;
2255  pos = unpack(argstream_size, msg->bytes, pos);
2256  set_argstream_size<i>(key, argstream_size);
2257  } else {
2258  auto rank = world.rank();
2259  assert(keymap() == rank);
2260  std::size_t argstream_size;
2261  pos = unpack(argstream_size, msg->bytes, pos);
2262  set_argstream_size<i>(argstream_size);
2263  }
2264  }
2265 
2266  template <std::size_t i>
2267  void get_from_pull_msg(void *data, std::size_t size) {
2268  using msg_t = detail::msg_t;
2269  msg_t *msg = static_cast<msg_t *>(data);
2270  auto &in = std::get<i>(input_terminals);
2271  if constexpr (!ttg::meta::is_void_v<keyT>) {
2272  /* unpack the key */
2273  uint64_t pos = 0;
2274  keyT key;
2275  pos = unpack(key, msg->bytes, pos);
2276  set_arg<i>(key, (in.container).get(key));
2277  }
2278  }
2279 
2280  template <std::size_t i, typename Key, typename Value>
2281  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2282  const Key &key, Value &&value) {
2283  set_arg_local_impl<i>(key, std::forward<Value>(value));
2284  }
2285 
2286  template <std::size_t i, typename Key = keyT, typename Value>
2287  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2288  Value &&value) {
2289  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2290  }
2291 
2292  template <std::size_t i, typename Key, typename Value>
2293  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2294  const Key &key, const Value &value) {
2295  set_arg_local_impl<i>(key, value);
2296  }
2297 
2298  template <std::size_t i, typename Key = keyT, typename Value>
2299  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2300  const Value &value) {
2301  set_arg_local_impl<i>(ttg::Void{}, value);
2302  }
2303 
2304  template <std::size_t i, typename Key = keyT, typename Value>
2305  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2306  std::shared_ptr<const Value> &valueptr) {
2307  set_arg_local_impl<i>(ttg::Void{}, *valueptr);
2308  }
2309 
2310  template <typename Key>
2311  task_t *create_new_task(const Key &key) {
2312  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2313  auto &world_impl = world.impl();
2314  task_t *newtask;
2315  parsec_thread_mempool_t *mempool = get_task_mempool();
2316  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2317  int32_t priority = 0;
2318  if constexpr (!keyT_is_Void) {
2319  priority = priomap(key);
2320  /* placement-new the task */
2321  newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2322  } else {
2323  priority = priomap();
2324  /* placement-new the task */
2325  newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2326  }
2327 
2328  for (int i = 0; i < static_stream_goal.size(); ++i) {
2329  newtask->streams[i].goal = static_stream_goal[i];
2330  }
2331 
2332  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2333  return newtask;
2334  }
2335 
2336 
2337  template <std::size_t i>
2339  /* make sure we can reuse the existing memory pool and don't have to create a new one */
2340  static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2341  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2342  auto &world_impl = world.impl();
2343  detail::reducer_task_t *newtask;
2344  parsec_thread_mempool_t *mempool = get_task_mempool();
2345  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2346  // use the priority of the task we stream into
2347  int32_t priority = 0;
2348  if constexpr (!keyT_is_Void) {
2349  priority = priomap(task->key);
2350  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2351  } else {
2352  priority = priomap();
2353  ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2354  }
2355  /* placement-new the task */
2356  newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2357  world_impl.taskpool(), priority, is_first);
2358 
2359  return newtask;
2360  }
2361 
2362 
2363  // Used to set the i'th argument
2364  template <std::size_t i, typename Key, typename Value>
2365  void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2366  parsec_task_t **task_ring = nullptr) {
2367  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2368  constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2369  constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2370  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2371 
2372 
2373  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2374 
2375  parsec_key_t hk = 0;
2376  if constexpr (!keyT_is_Void) {
2377  hk = reinterpret_cast<parsec_key_t>(&key);
2378  assert(keymap(key) == world.rank());
2379  }
2380 
2381  task_t *task;
2382  auto &world_impl = world.impl();
2383  auto &reducer = std::get<i>(input_reducers);
2384  bool release = false;
2385  bool remove_from_hash = true;
2386 #if defined(PARSEC_PROF_GRAPHER)
2387  bool discover_task = true;
2388 #endif
2389  bool get_pull_data = false;
2390  bool has_lock = false;
2391  /* If we have only one input and no reducer on that input we can skip the hash table */
2392  if (numins > 1 || reducer) {
2393  has_lock = true;
2394  parsec_hash_table_lock_bucket(&tasks_table, hk);
2395  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2396  task = create_new_task(key);
2397  world_impl.increment_created();
2398  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
2399  get_pull_data = !is_lazy_pull();
2400  if( world_impl.dag_profiling() ) {
2401 #if defined(PARSEC_PROF_GRAPHER)
2402  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2403  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2404 #endif
2405  }
2406  } else if (!reducer && numins == (task->in_data_count + 1)) {
2407  /* remove while we have the lock */
2408  parsec_hash_table_nolock_remove(&tasks_table, hk);
2409  remove_from_hash = false;
2410  }
2411  /* if we have a reducer, we need to hold on to the lock for just a little longer */
2412  if (!reducer) {
2413  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2414  has_lock = false;
2415  }
2416  } else {
2417  task = create_new_task(key);
2418  world_impl.increment_created();
2419  remove_from_hash = false;
2420  if( world_impl.dag_profiling() ) {
2421 #if defined(PARSEC_PROF_GRAPHER)
2422  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2423  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2424 #endif
2425  }
2426  }
2427 
2428  if( world_impl.dag_profiling() ) {
2429 #if defined(PARSEC_PROF_GRAPHER)
2430  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2432  char orig_str[32];
2433  char dest_str[32];
2434  if(orig_index >= 0) {
2435  snprintf(orig_str, 32, "%d", orig_index);
2436  } else {
2437  strncpy(orig_str, "_", 32);
2438  }
2439  snprintf(dest_str, 32, "%lu", i);
2440  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2441  .flow_index = 0, .flow_datatype_mask = ~0 };
2442  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2443  .flow_index = 0, .flow_datatype_mask = ~0 };
2444  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, discover_task ? 1 : 0, &orig, &dest);
2445  }
2446 #endif
2447  }
2448 
2449  auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2450  detail::ttg_data_copy_t *copy = copy_in;
2451  if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2453  }
2454  if (nullptr != copy) {
2455  /* retain the data copy */
2456  copy = detail::register_data_copy<valueT>(copy, task, is_const);
2457  } else {
2458  /* create a new copy */
2459  copy = detail::create_new_datacopy(std::forward<Value>(value));
2460  if (!is_const) {
2461  copy->mark_mutable();
2462  }
2463  }
2464  return copy;
2465  };
2466 
2467  if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2468  auto submit_reducer_task = [&](auto *parent_task){
2469  /* check if we need to create a task */
2470  std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2471  //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2472  if (0 == c) {
2473  /* we are responsible for creating the reduction task */
2474  detail::reducer_task_t *reduce_task;
2475  reduce_task = create_new_reducer_task<i>(parent_task, false);
2476  reduce_task->release_task(reduce_task); // release immediately
2477  }
2478  };
2479 
2480  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2481  // have a value already? if not, set, otherwise reduce
2482  detail::ttg_data_copy_t *copy = nullptr;
2483  if (nullptr == (copy = task->copies[i])) {
2484  using decay_valueT = std::decay_t<valueT>;
2485 
2486  /* first input value, create a task and bind it to the copy */
2487  //std::cout << "Creating new reducer task for " << key << std::endl;
2488  detail::reducer_task_t *reduce_task;
2489  reduce_task = create_new_reducer_task<i>(task, true);
2490 
2491  /* protected by the bucket lock */
2492  task->streams[i].size = 1;
2493  task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2494 
2495  /* get the copy to use as input for this task */
2496  detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2497 
2498  /* put the copy into the task */
2499  task->copies[i] = copy;
2500 
2501  /* release the task if we're not deferred
2502  * TODO: can we delay that until we get the second value?
2503  */
2504  if (copy->get_next_task() != &reduce_task->parsec_task) {
2505  reduce_task->release_task(reduce_task);
2506  }
2507 
2508  /* now we can unlock the bucket */
2509  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2510  } else {
2511  /* unlock the bucket, the lock is not needed anymore */
2512  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2513 
2514  /* get the copy to use as input for this task */
2515  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2516 
2517  /* enqueue the data copy to be reduced */
2518  parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2519  submit_reducer_task(task);
2520  }
2521  } else {
2522  /* unlock the bucket, the lock is not needed anymore */
2523  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2524  /* submit reducer for void values to handle side effects */
2525  submit_reducer_task(task);
2526  }
2527  //if (release) {
2528  // parsec_hash_table_nolock_remove(&tasks_table, hk);
2529  // remove_from_hash = false;
2530  //}
2531  //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2532  } else {
2533  /* unlock the bucket, the lock is not needed anymore */
2534  if (has_lock) {
2535  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2536  }
2537  /* whether the task needs to be deferred or not */
2538  if constexpr (!valueT_is_Void) {
2539  if (nullptr != task->copies[i]) {
2540  ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2541  throw std::logic_error("bad set arg");
2542  }
2543 
2544  /* get the copy to use as input for this task */
2545  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2546 
2547  /* if we registered as a writer and were the first to register with this copy
2548  * we need to defer the release of this task to give other tasks a chance to
2549  * make a copy of the original data */
2550  release = (copy->get_next_task() != &task->parsec_task);
2551  task->copies[i] = copy;
2552  } else {
2553  release = true;
2554  }
2555  }
2556  task->remove_from_hash = remove_from_hash;
2557  if (release) {
2558  release_task(task, task_ring);
2559  }
2560  /* if not pulling lazily, pull the data here */
2561  if constexpr (!ttg::meta::is_void_v<keyT>) {
2562  if (get_pull_data) {
2563  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2564  }
2565  }
2566  }
2567 
2569  bool constrained = false;
2570  if (constraints_check.size() > 0) {
2571  if constexpr (ttg::meta::is_void_v<keyT>) {
2572  constrained = !constraints_check[0]();
2573  } else {
2574  constrained = !constraints_check[0](task->key);
2575  }
2576  }
2577  if (constrained) {
2578  // store the task so we can later access it once it is released
2579  parsec_hash_table_insert(&task_constraint_table, &task->tt_ht_item);
2580  }
2581  return !constrained;
2582  }
2583 
2584  template<typename Key = keyT>
2585  std::enable_if_t<ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid) {
2586  // check the next constraint, if any
2587  assert(cid < constraints_check.size());
2588  bool release = true;
2589  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2590  if (!constraints_check[i]()) {
2591  release = false;
2592  break;
2593  }
2594  }
2595  if (release) {
2596  // no constraint blocked us
2597  task_t *task;
2598  parsec_key_t hk = 0;
2599  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2600  assert(task != nullptr);
2601  auto &world_impl = world.impl();
2602  parsec_execution_stream_t *es = world_impl.execution_stream();
2603  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2604  __parsec_schedule_vp(es, vp_task_rings, 0);
2605  }
2606  }
2607 
2608  template<typename Key = keyT>
2609  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid, const std::span<Key>& keys) {
2610  assert(cid < constraints_check.size());
2611  parsec_task_t *task_ring = nullptr;
2612  for (auto& key : keys) {
2613  task_t *task;
2614  bool release = true;
2615  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2616  if (!constraints_check[i](key)) {
2617  release = false;
2618  break;
2619  }
2620  }
2621 
2622  if (release) {
2623  // no constraint blocked this task, so go ahead and release
2624  auto hk = reinterpret_cast<parsec_key_t>(&key);
2625  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2626  assert(task != nullptr);
2627  if (task_ring == nullptr) {
2628  /* the first task is set directly */
2629  task_ring = &task->parsec_task;
2630  } else {
2631  /* push into the ring */
2632  parsec_list_item_ring_push_sorted(&task_ring->super, &task->parsec_task.super,
2633  offsetof(parsec_task_t, priority));
2634  }
2635  }
2636  }
2637  if (nullptr != task_ring) {
2638  auto &world_impl = world.impl();
2639  parsec_execution_stream_t *es = world_impl.execution_stream();
2640  parsec_task_t *vp_task_rings[1] = { task_ring };
2641  __parsec_schedule_vp(es, vp_task_rings, 0);
2642  }
2643  }
2644 
2645  void release_task(task_t *task,
2646  parsec_task_t **task_ring = nullptr) {
2647  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2648 
2649  /* if remove_from_hash == false, someone has already removed the task from the hash table
2650  * so we know that the task is ready, no need to do atomic increments here */
2651  bool is_ready = !task->remove_from_hash;
2652  int32_t count;
2653  if (is_ready) {
2654  count = numins;
2655  } else {
2656  count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2657  assert(count <= self.dependencies_goal);
2658  }
2659 
2660  auto &world_impl = world.impl();
2661  ttT *baseobj = task->tt;
2662 
2663  if (count == numins) {
2664  parsec_execution_stream_t *es = world_impl.execution_stream();
2665  parsec_key_t hk = task->pkey();
2666  if (tracing()) {
2667  if constexpr (!keyT_is_Void) {
2668  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2669  } else {
2670  ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2671  }
2672  }
2673  if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2674 
2675  if (check_constraints(task)) {
2676  if (nullptr == task_ring) {
2677  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2678  __parsec_schedule_vp(es, vp_task_rings, 0);
2679  } else if (*task_ring == nullptr) {
2680  /* the first task is set directly */
2681  *task_ring = &task->parsec_task;
2682  } else {
2683  /* push into the ring */
2684  parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2685  offsetof(parsec_task_t, priority));
2686  }
2687  }
2688  } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2689  if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2690  /* lazily pull the pull terminal data */
2691  baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2692  }
2693  }
2694  }
2695 
2696  // cases 1+2
2697  template <std::size_t i, typename Key, typename Value>
2698  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2699  Value &&value) {
2700  set_arg_impl<i>(key, std::forward<Value>(value));
2701  }
2702 
2703  // cases 4+5+6
2704  template <std::size_t i, typename Key, typename Value>
2705  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2706  set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2707  }
2708 
2709  template <std::size_t i, typename Key = keyT>
2710  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2711  set_arg_impl<i>(ttg::Void{}, ttg::Void{});
2712  }
2713 
2714  // case 3
2715  template <std::size_t i, typename Key>
2716  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2717  set_arg_impl<i>(key, ttg::Void{});
2718  }
2719 
2720  template<typename Value, typename Key>
2721  bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2722  using decvalueT = std::decay_t<Value>;
2723  bool inline_data = false;
2724  /* check whether to send data in inline */
2725  std::size_t iov_size = 0;
2726  std::size_t metadata_size = 0;
2727  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2729  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2730  iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2731  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2732  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2733  metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2734  } else {
2735  /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2736  metadata_size = ttg::default_data_descriptor<ttg::meta::remove_cvr_t<Value>>::payload_size(value_ptr);
2737  iov_size = std::accumulate(copy->iovec_begin(), copy->iovec_end(), 0,
2738  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2739  }
2740  /* key is packed at the end */
2741  std::size_t key_pack_size = ttg::default_data_descriptor<Key>::payload_size(&key);
2742  std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2743  if (pack_size < detail::max_inline_size) {
2744  inline_data = true;
2745  }
2746  return inline_data;
2747  }
2748 
2749  // Used to set the i'th argument
2750  template <std::size_t i, typename Key, typename Value>
2751  void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2752  int owner;
2753  using decvalueT = std::decay_t<Value>;
2754  using norefvalueT = std::remove_reference_t<Value>;
2755  norefvalueT *value_ptr = &value;
2756 
2757 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2758  if(world.impl().profiling()) {
2759  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2760  }
2761 #endif
2762 
2763  if constexpr (!ttg::meta::is_void_v<Key>)
2764  owner = keymap(key);
2765  else
2766  owner = keymap();
2767  if (owner == world.rank()) {
2768  if constexpr (!ttg::meta::is_void_v<keyT>)
2769  set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2770  else
2771  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2772 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2773  if(world.impl().profiling()) {
2774  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2775  }
2776 #endif
2777  return;
2778  }
2779  // the target task is remote. Pack the information and send it to
2780  // the corresponding peer.
2781  // TODO do we need to copy value?
2782  using msg_t = detail::msg_t;
2783  auto &world_impl = world.impl();
2784  uint64_t pos = 0;
2785  int num_iovecs = 0;
2786  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2787  msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
2788 
2789  if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2790 
2791  detail::ttg_data_copy_t *copy = copy_in;
2792  /* make sure we have a data copy to register with */
2793  if (nullptr == copy) {
2795  if (nullptr == copy) {
2796  // We need to create a copy for this data, as it does not exist yet.
2797  copy = detail::create_new_datacopy(std::forward<Value>(value));
2798  // use the new value from here on out
2799  value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2800  }
2801  }
2802 
2803  bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2804  msg->tt_id.inline_data = inline_data;
2805 
2806  auto handle_iovec_fn = [&](auto&& iovecs){
2807 
2808  if (inline_data) {
2809  /* inline data is packed right after the tt_id in the message */
2810  for (auto &&iov : iovecs) {
2811  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2812  pos += iov.num_bytes;
2813  }
2814  } else {
2815 
2816  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2817  * raw function pointer instead of a preregistered AM tag, so play that game.
2818  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2819  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2820  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2821  pos += sizeof(cbtag);
2822 
2827  for (auto &&iov : iovecs) {
2828  copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2829  parsec_ce_mem_reg_handle_t lreg;
2830  size_t lreg_size;
2831  /* TODO: only register once when we can broadcast the data! */
2832  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2833  iov.num_bytes, &lreg, &lreg_size);
2834  auto lreg_ptr = std::shared_ptr<void>{lreg, [](void *ptr) {
2835  parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr;
2836  parsec_ce.mem_unregister(&memreg);
2837  }};
2838  int32_t lreg_size_i = lreg_size;
2839  std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2840  pos += sizeof(lreg_size_i);
2841  std::memcpy(msg->bytes + pos, lreg, lreg_size);
2842  pos += lreg_size;
2843  //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2844  /* TODO: can we avoid the extra indirection of going through std::function? */
2845  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2846  /* shared_ptr of value and registration captured by value so resetting
2847  * them here will eventually release the memory/registration */
2849  lreg_ptr.reset();
2850  });
2851  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2852  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2853  pos += sizeof(fn_ptr);
2854  }
2855  }
2856  };
2857 
2858  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2860  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2861  num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2862  /* pack the metadata */
2863  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2864  pos = pack(metadata, msg->bytes, pos);
2865  //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2866  handle_iovec_fn(iovs);
2867  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2868  /* serialize the object */
2869  //std::cout << "PRE pack num_iovecs " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2870  pos = pack(*value_ptr, msg->bytes, pos, copy);
2871  num_iovecs = std::distance(copy->iovec_begin(), copy->iovec_end());
2872  //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2873  /* handle any iovecs contained in it */
2874  handle_iovec_fn(copy->iovec_span());
2875  copy->iovec_reset();
2876  }
2877 
2878  msg->tt_id.num_iovecs = num_iovecs;
2879  }
2880 
2881  /* pack the key */
2882  msg->tt_id.num_keys = 0;
2883  msg->tt_id.key_offset = pos;
2884  if constexpr (!ttg::meta::is_void_v<Key>) {
2885  size_t tmppos = pack(key, msg->bytes, pos);
2886  pos = tmppos;
2887  msg->tt_id.num_keys = 1;
2888  }
2889 
2890  parsec_taskpool_t *tp = world_impl.taskpool();
2891  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2892  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2893  //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2894  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2895  sizeof(msg_header_t) + pos);
2896 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2897  if(world.impl().profiling()) {
2898  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2899  }
2900 #endif
2901 #if defined(PARSEC_PROF_GRAPHER)
2902  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2904  char orig_str[32];
2905  char dest_str[32];
2906  if(orig_index >= 0) {
2907  snprintf(orig_str, 32, "%d", orig_index);
2908  } else {
2909  strncpy(orig_str, "_", 32);
2910  }
2911  snprintf(dest_str, 32, "%lu", i);
2912  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2913  .flow_index = 0, .flow_datatype_mask = ~0 };
2914  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2915  .flow_index = 0, .flow_datatype_mask = ~0 };
2916  task_t *task = create_new_task(key);
2917  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, 0, &orig, &dest);
2918  delete task;
2919  }
2920 #endif
2921  }
2922 
2923  template <int i, typename Iterator, typename Value>
2924  void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
2925 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2926  if(world.impl().profiling()) {
2927  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2928  }
2929 #endif
2930  parsec_task_t *task_ring = nullptr;
2931  detail::ttg_data_copy_t *copy = nullptr;
2932  if (nullptr != detail::parsec_ttg_caller) {
2934  }
2935 
2936  for (auto it = begin; it != end; ++it) {
2937  set_arg_local_impl<i>(*it, value, copy, &task_ring);
2938  }
2939  /* submit all ready tasks at once */
2940  if (nullptr != task_ring) {
2941  parsec_task_t *vp_task_ring[1] = { task_ring };
2942  __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2943  }
2944 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2945  if(world.impl().profiling()) {
2946  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2947  }
2948 #endif
2949  }
2950 
2951  template <std::size_t i, typename Key, typename Value>
2952  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2953  void>
2954  broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
2955  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2956  auto world = ttg_default_execution_context();
2957  auto np = world.size();
2958  int rank = world.rank();
2959  uint64_t pos = 0;
2960  bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2961  [&](const Key &key) { return keymap(key) != rank; });
2962 
2963  if (have_remote) {
2964  using decvalueT = std::decay_t<Value>;
2965 
2966  /* sort the input key list by owner and check whether there are remote keys */
2967  std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2968  std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
2969  int rank_a = keymap(a);
2970  int rank_b = keymap(b);
2971  // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
2972  int pos_a = (rank_a + np - rank) % np;
2973  int pos_b = (rank_b + np - rank) % np;
2974  return pos_a < pos_b;
2975  });
2976 
2977  /* Assuming there are no local keys, will be updated while iterating over the keys */
2978  auto local_begin = keylist_sorted.end();
2979  auto local_end = keylist_sorted.end();
2980 
2981  int32_t num_iovs = 0;
2982 
2985  assert(nullptr != copy);
2986 
2987  using msg_t = detail::msg_t;
2988  auto &world_impl = world.impl();
2989  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2990  msg_header_t::MSG_SET_ARG, i, world_impl.rank());
2991 
2992  /* check if we inline the data */
2993  /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
2994  bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
2995  msg->tt_id.inline_data = inline_data;
2996 
2997  std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
2998  auto handle_iovs_fn = [&](auto&& iovs){
2999 
3000  if (inline_data) {
3001  /* inline data is packed right after the tt_id in the message */
3002  for (auto &&iov : iovs) {
3003  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
3004  pos += iov.num_bytes;
3005  }
3006  } else {
3007 
3008  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
3009  * raw function pointer instead of a preregistered AM tag, so play that game.
3010  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
3011  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
3012  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
3013  pos += sizeof(cbtag);
3014 
3015  for (auto &&iov : iovs) {
3016  parsec_ce_mem_reg_handle_t lreg;
3017  size_t lreg_size;
3018  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
3019  iov.num_bytes, &lreg, &lreg_size);
3020  /* TODO: use a static function for deregistration here? */
3021  memregs.push_back(std::make_pair(static_cast<int32_t>(lreg_size),
3022  /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
3023  std::shared_ptr<void>{lreg, [](void *ptr) {
3024  parsec_ce_mem_reg_handle_t memreg =
3025  (parsec_ce_mem_reg_handle_t)ptr;
3026  //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
3027  parsec_ce.mem_unregister(&memreg);
3028  }}));
3029  //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
3030  }
3031  }
3032  };
3033 
3034  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
3036  /* pack the metadata */
3037  auto metadata = descr.get_metadata(value);
3038  pos = pack(metadata, msg->bytes, pos);
3039  auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
3040  num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3041  memregs.reserve(num_iovs);
3042  handle_iovs_fn(iovs);
3043  //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
3044  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
3045  /* serialize the object once */
3046  pos = pack(value, msg->bytes, pos, copy);
3047  num_iovs = std::distance(copy->iovec_begin(), copy->iovec_end());
3048  handle_iovs_fn(copy->iovec_span());
3049  copy->iovec_reset();
3050  }
3051 
3052  msg->tt_id.num_iovecs = num_iovs;
3053 
3054  std::size_t save_pos = pos;
3055 
3056  parsec_taskpool_t *tp = world_impl.taskpool();
3057  for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
3058 
3059  auto owner = keymap(*it);
3060  if (owner == rank) {
3061  local_begin = it;
3062  /* find first non-local key */
3063  local_end =
3064  std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
3065  it = local_end;
3066  continue;
3067  }
3068 
3069  /* rewind the buffer and start packing a new set of memregs and keys */
3070  pos = save_pos;
3076  if (!inline_data) {
3077  for (int idx = 0; idx < num_iovs; ++idx) {
3078  // auto [lreg_size, lreg_ptr] = memregs[idx];
3079  int32_t lreg_size;
3080  std::shared_ptr<void> lreg_ptr;
3081  std::tie(lreg_size, lreg_ptr) = memregs[idx];
3082  std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3083  pos += sizeof(lreg_size);
3084  std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3085  pos += lreg_size;
3086  //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3087  /* mark another reader on the copy */
3088  copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3089  /* create a function that will be invoked upon RMA completion at the target */
3090  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3091  /* shared_ptr of value and registration captured by value so resetting
3092  * them here will eventually release the memory/registration */
3094  lreg_ptr.reset();
3095  });
3096  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3097  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3098  pos += sizeof(fn_ptr);
3099  }
3100  }
3101 
3102  /* mark the beginning of the keys */
3103  msg->tt_id.key_offset = pos;
3104 
3105  /* pack all keys for this owner */
3106  int num_keys = 0;
3107  do {
3108  ++num_keys;
3109  pos = pack(*it, msg->bytes, pos);
3110  ++it;
3111  } while (it < keylist_sorted.end() && keymap(*it) == owner);
3112  msg->tt_id.num_keys = num_keys;
3113 
3114  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3115  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3116  //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3117  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3118  sizeof(msg_header_t) + pos);
3119  }
3120  /* handle local keys */
3121  broadcast_arg_local<i>(local_begin, local_end, value);
3122  } else {
3123  /* handle local keys */
3124  broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3125  }
3126  }
3127 
3128  // Used by invoke to set all arguments associated with a task
3129  // Is: index sequence of elements in args
3130  // Js: index sequence of input terminals to set
3131  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3132  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3133  std::index_sequence<Js...>, const Key &key,
3134  const std::tuple<Ts...> &args) {
3135  static_assert(sizeof...(Js) == sizeof...(Is));
3136  constexpr size_t js[] = {Js...};
3137  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3138  junk[0]++;
3139  }
3140 
3141  // Used by invoke to set all arguments associated with a task
3142  // Is: index sequence of input terminals to set
3143  template <typename Key, typename... Ts, size_t... Is>
3144  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3145  const std::tuple<Ts...> &args) {
3146  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3147  }
3148 
3149  // Used by invoke to set all arguments associated with a task
3150  // Is: index sequence of elements in args
3151  // Js: index sequence of input terminals to set
3152  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3153  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3154  const std::tuple<Ts...> &args) {
3155  static_assert(sizeof...(Js) == sizeof...(Is));
3156  constexpr size_t js[] = {Js...};
3157  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3158  junk[0]++;
3159  }
3160 
3161  // Used by invoke to set all arguments associated with a task
3162  // Is: index sequence of input terminals to set
3163  template <typename Key = keyT, typename... Ts, size_t... Is>
3164  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3165  const std::tuple<Ts...> &args) {
3166  set_args(std::index_sequence_for<Ts...>{}, is, args);
3167  }
3168 
3169  public:
3172  template <std::size_t i>
3173  void set_static_argstream_size(std::size_t size) {
3174  assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3175  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3176 
3177  this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3178 
3179  // Check if stream is already bounded
3180  if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3181  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3182  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3183  }
3184 
3185  static_stream_goal[i] = size;
3186  }
3187 
3191  template <std::size_t i, typename Key>
3192  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3193  // preconditions
3194  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3195  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3196 
3197  // body
3198  const auto owner = keymap(key);
3199  if (owner != world.rank()) {
3200  ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3201  using msg_t = detail::msg_t;
3202  auto &world_impl = world.impl();
3203  uint64_t pos = 0;
3204  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3206  world_impl.rank(), 1);
3207  /* pack the key */
3208  pos = pack(key, msg->bytes, pos);
3209  pos = pack(size, msg->bytes, pos);
3210  parsec_taskpool_t *tp = world_impl.taskpool();
3211  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3212  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3213  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3214  sizeof(msg_header_t) + pos);
3215  } else {
3216  ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3217 
3218  auto hk = reinterpret_cast<parsec_key_t>(&key);
3219  task_t *task;
3220  parsec_hash_table_lock_bucket(&tasks_table, hk);
3221  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3222  task = create_new_task(key);
3223  world.impl().increment_created();
3224  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3225  if( world.impl().dag_profiling() ) {
3226 #if defined(PARSEC_PROF_GRAPHER)
3227  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3228 #endif
3229  }
3230  }
3231  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3232 
3233  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3234  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3235 
3236  // commit changes
3237  // 1) "lock" the stream by incrementing the reduce_count
3238  // 2) set the goal
3239  // 3) "unlock" the stream
3240  // only one thread will see the reduce_count be zero and the goal match the size
3241  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3242  task->streams[i].goal = size;
3243  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3244  if (1 == c && (task->streams[i].size >= size)) {
3245  release_task(task);
3246  }
3247  }
3248  }
3249 
3252  template <std::size_t i, typename Key = keyT>
3253  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3254  // preconditions
3255  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3256  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3257 
3258  // body
3259  const auto owner = keymap();
3260  if (owner != world.rank()) {
3261  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3262  using msg_t = detail::msg_t;
3263  auto &world_impl = world.impl();
3264  uint64_t pos = 0;
3265  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3267  world_impl.rank(), 0);
3268  pos = pack(size, msg->bytes, pos);
3269  parsec_taskpool_t *tp = world_impl.taskpool();
3270  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3271  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3272  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3273  sizeof(msg_header_t) + pos);
3274  } else {
3275  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3276 
3277  parsec_key_t hk = 0;
3278  task_t *task;
3279  parsec_hash_table_lock_bucket(&tasks_table, hk);
3280  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3281  task = create_new_task(ttg::Void{});
3282  world.impl().increment_created();
3283  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3284  if( world.impl().dag_profiling() ) {
3285 #if defined(PARSEC_PROF_GRAPHER)
3286  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3287 #endif
3288  }
3289  }
3290  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3291 
3292  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3293  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3294 
3295  // commit changes
3296  // 1) "lock" the stream by incrementing the reduce_count
3297  // 2) set the goal
3298  // 3) "unlock" the stream
3299  // only one thread will see the reduce_count be zero and the goal match the size
3300  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3301  task->streams[i].goal = size;
3302  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3303  if (1 == c && (task->streams[i].size >= size)) {
3304  release_task(task);
3305  }
3306  }
3307  }
3308 
3310  template <std::size_t i, typename Key>
3311  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3312  // preconditions
3313  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3314 
3315  // body
3316  const auto owner = keymap(key);
3317  if (owner != world.rank()) {
3318  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3319  using msg_t = detail::msg_t;
3320  auto &world_impl = world.impl();
3321  uint64_t pos = 0;
3322  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3324  world_impl.rank(), 1);
3325  /* pack the key */
3326  pos = pack(key, msg->bytes, pos);
3327  parsec_taskpool_t *tp = world_impl.taskpool();
3328  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3329  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3330  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3331  sizeof(msg_header_t) + pos);
3332  } else {
3333  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3334 
3335  auto hk = reinterpret_cast<parsec_key_t>(&key);
3336  task_t *task = nullptr;
3337  //parsec_hash_table_lock_bucket(&tasks_table, hk);
3338  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3339  ttg::print_error(world.rank(), ":", get_name(), ":", key,
3340  " : error finalize called on stream that never received an input data: ", i);
3341  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3342  }
3343 
3344  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3345  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3346 
3347  // commit changes
3348  // 1) "lock" the stream by incrementing the reduce_count
3349  // 2) set the goal
3350  // 3) "unlock" the stream
3351  // only one thread will see the reduce_count be zero and the goal match the size
3352  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3353  task->streams[i].goal = 1;
3354  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3355  if (1 == c && (task->streams[i].size >= 1)) {
3356  release_task(task);
3357  }
3358  }
3359  }
3360 
3362  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3363  std::enable_if_t<key_is_void, void> finalize_argstream() {
3364  // preconditions
3365  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3366 
3367  // body
3368  const auto owner = keymap();
3369  if (owner != world.rank()) {
3370  ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3371  using msg_t = detail::msg_t;
3372  auto &world_impl = world.impl();
3373  uint64_t pos = 0;
3374  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3376  world_impl.rank(), 0);
3377  parsec_taskpool_t *tp = world_impl.taskpool();
3378  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3379  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3380  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3381  sizeof(msg_header_t) + pos);
3382  } else {
3383  ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3384 
3385  auto hk = static_cast<parsec_key_t>(0);
3386  task_t *task = nullptr;
3387  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3388  ttg::print_error(world.rank(), ":", get_name(),
3389  " : error finalize called on stream that never received an input data: ", i);
3390  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3391  }
3392 
3393  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3394  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3395 
3396  // commit changes
3397  // 1) "lock" the stream by incrementing the reduce_count
3398  // 2) set the goal
3399  // 3) "unlock" the stream
3400  // only one thread will see the reduce_count be zero and the goal match the size
3401  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3402  task->streams[i].goal = 1;
3403  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3404  if (1 == c && (task->streams[i].size >= 1)) {
3405  release_task(task);
3406  }
3407  }
3408  }
3409 
3411 
3412  assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3413  parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task;
3414  auto check_parsec_data = [&](parsec_data_t* data) {
3415  if (data->owner_device != 0) {
3416  /* find the flow */
3417  int flowidx = 0;
3418  while (flowidx < MAX_PARAM_COUNT &&
3419  gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3420  if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3421  /* found the right data, set the corresponding flow as pushout */
3422  break;
3423  }
3424  ++flowidx;
3425  }
3426  if (flowidx == MAX_PARAM_COUNT) {
3427  throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3428  }
3429  if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3430  /* no flow found, add one and mark it pushout */
3431  detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3432  gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3433  }
3434  /* need to mark the flow WRITE to convince PaRSEC that the data changed */
3435  ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_WRITE;
3436  gpu_task->pushout |= 1<<flowidx;
3437  }
3438  };
3439  copy->foreach_parsec_data(check_parsec_data);
3440  }
3441 
3442 
3443  /* check whether a data needs to be pushed out */
3444  template <std::size_t i, typename Value, typename RemoteCheckFn>
3445  std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3446  void>
3447  do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) {
3448  constexpr const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3449 
3450  /* get the copy */
3453 
3454  /* if there is no copy we don't need to prepare anything */
3455  if (nullptr == copy) {
3456  return;
3457  }
3458 
3460  bool need_pushout = false;
3461 
3463  /* already marked pushout, skip the rest */
3464  return;
3465  }
3466 
3467  /* TODO: remove this once we support reductions on the GPU */
3468  auto &reducer = std::get<i>(input_reducers);
3469  if (reducer) {
3470  /* reductions are currently done only on the host so push out */
3471  copy_mark_pushout(copy);
3473  return;
3474  }
3475 
3476  if constexpr (value_is_const) {
3478  /* The data has been modified previously. If not all devices can access
3479  * their peers then we need to push out to the host so that all devices
3480  * have the data available for reading.
3481  * NOTE: we currently don't allow users to force the next writer to be
3482  * on a different device. In that case PaRSEC would take the host-side
3483  * copy. If we change our restriction we need to revisit this.
3484  * Ideally, PaRSEC would take the device copy if the owner moves... */
3485  need_pushout = !detail::all_devices_peer_access;
3486  }
3487 
3488  /* check for multiple readers */
3491  }
3492 
3494  /* there is a writer already, we will need to create a copy */
3495  need_pushout = true;
3496  }
3497 
3499  } else {
3502  need_pushout = true;
3503  } else {
3505  /* there are readers, we will need to create a copy */
3506  need_pushout = true;
3507  }
3509  }
3510  }
3511 
3512  if constexpr (!derived_has_device_op()) {
3513  need_pushout = true;
3514  }
3515 
3516  /* check if there are non-local successors if it's a device task */
3517  if (!need_pushout) {
3518  bool device_supported = false;
3519  if constexpr (derived_has_cuda_op()) {
3520  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::CUDA);
3521  } else if constexpr (derived_has_hip_op()) {
3522  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::HIP);
3523  } else if constexpr (derived_has_level_zero_op()) {
3524  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::L0);
3525  }
3526  /* if MPI supports the device we don't care whether we have remote peers
3527  * because we can send from the device directly */
3528  if (!device_supported) {
3529  need_pushout = remote_check();
3530  }
3531  }
3532 
3533  if (need_pushout) {
3534  copy_mark_pushout(copy);
3536  }
3537  }
3538 
3539  /* check whether a data needs to be pushed out */
3540  template <std::size_t i, typename Key, typename Value>
3541  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3542  void>
3543  prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3544  auto remote_check = [&](){
3545  auto world = ttg_default_execution_context();
3546  int rank = world.rank();
3547  bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3548  [&](const Key &key) { return keymap(key) != rank; });
3549  return remote;
3550  };
3551  do_prepare_send<i>(value, remote_check);
3552  }
3553 
3554  template <std::size_t i, typename Key, typename Value>
3555  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3556  void>
3557  prepare_send(const Value &value) {
3558  auto remote_check = [&](){
3559  auto world = ttg_default_execution_context();
3560  int rank = world.rank();
3561  return (keymap() != rank);
3562  };
3563  do_prepare_send<i>(value, remote_check);
3564  }
3565 
3566  private:
3567  // Copy/assign/move forbidden ... we could make it work using
3568  // PIMPL for this base class. However, this instance of the base
3569  // class is tied to a specific instance of a derived class a
3570  // pointer to which is captured for invoking derived class
3571  // functions. Thus, not only does the derived class has to be
3572  // involved but we would have to do it in a thread safe way
3573  // including for possibly already running tasks and remote
3574  // references. This is not worth the effort ... wherever you are
3575  // wanting to move/assign an TT you should be using a pointer.
3576  TT(const TT &other) = delete;
3577  TT &operator=(const TT &other) = delete;
3578  TT(TT &&other) = delete;
3579  TT &operator=(TT &&other) = delete;
3580 
3581  // Registers the callback for the i'th input terminal
3582  template <typename terminalT, std::size_t i>
3583  void register_input_callback(terminalT &input) {
3584  using valueT = std::decay_t<typename terminalT::value_type>;
3585  if (input.is_pull_terminal) {
3586  num_pullins++;
3587  }
3589  // case 1: nonvoid key, nonvoid value
3591  if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3592  auto move_callback = [this](const keyT &key, valueT &&value) {
3593  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3594  };
3595  auto send_callback = [this](const keyT &key, const valueT &value) {
3596  set_arg<i, keyT, const valueT &>(key, value);
3597  };
3598  auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3599  broadcast_arg<i, keyT, valueT>(keylist, value);
3600  };
3601  auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3602  prepare_send<i, keyT, valueT>(keylist, value);
3603  };
3604  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3605  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3606  input.set_callback(send_callback, move_callback, broadcast_callback,
3607  setsize_callback, finalize_callback, prepare_send_callback);
3608  }
3610  // case 2: nonvoid key, void value, mixed inputs
3612  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3613  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3614  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3615  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3616  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3617  }
3619  // case 3: nonvoid key, void value, no inputs
3620  // NOTE: subsumed in case 2 above, kept for historical reasons
3623  // case 4: void key, nonvoid value
3625  else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3626  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3627  auto send_callback = [this](const valueT &value) {
3628  if constexpr (std::is_copy_constructible_v<valueT>) {
3629  set_arg<i, keyT, const valueT &>(value);
3630  }
3631  else {
3632  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3633  }
3634  };
3635  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3636  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3637  auto prepare_send_callback = [this](const valueT &value) {
3638  prepare_send<i, void>(value);
3639  };
3640  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3641  }
3643  // case 5: void key, void value, mixed inputs
3645  else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3646  auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3647  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3648  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3649  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3650  }
3652  // case 6: void key, void value, no inputs
3653  // NOTE: subsumed in case 5 above, kept for historical reasons
3655  else
3656  ttg::abort();
3657  }
3658 
3659  template <std::size_t... IS>
3660  void register_input_callbacks(std::index_sequence<IS...>) {
3661  int junk[] = {
3662  0,
3663  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3664  0)...};
3665  junk[0]++;
3666  }
3667 
3668  template <std::size_t... IS, typename inedgesT>
3669  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3670  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3671  junk[0]++;
3672  }
3673 
3674  template <std::size_t... IS, typename outedgesT>
3675  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3676  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3677  junk[0]++;
3678  }
3679 
3680 #if 0
3681  template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3682  void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3683  int junk[] = {0,
3684  (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3685  (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3686  : PARSEC_FLOW_ACCESS_RW),
3687  0)...};
3688  junk[0]++;
3689  }
3690 
3691  template <typename input_terminals_tupleT, typename flowsT>
3692  void initialize_flows(flowsT &&flows) {
3693  _initialize_flows<input_terminals_tupleT>(
3694  std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3695  }
3696 #endif // 0
3697 
3698  void fence() override { ttg::default_execution_context().impl().fence(); }
3699 
3700  static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3701  if constexpr (std::is_same_v<keyT, void>) {
3702  return 1;
3703  } else {
3704  keyT &ka = *(reinterpret_cast<keyT *>(a));
3705  keyT &kb = *(reinterpret_cast<keyT *>(b));
3706  return ka == kb;
3707  }
3708  }
3709 
3710  static uint64_t key_hash(parsec_key_t k, void *user_data) {
3711  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3712  if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3713  return 0;
3714  } else {
3715  keyT &kk = *(reinterpret_cast<keyT *>(k));
3716  using ttg::hash;
3717  uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3718  return hv;
3719  }
3720  }
3721 
3722  static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3723  if constexpr (std::is_same_v<keyT, void>) {
3724  buffer[0] = '\0';
3725  return buffer;
3726  } else {
3727  keyT kk = *(reinterpret_cast<keyT *>(k));
3728  std::stringstream iss;
3729  iss << kk;
3730  memset(buffer, 0, buffer_size);
3731  iss.get(buffer, buffer_size);
3732  return buffer;
3733  }
3734  }
3735 
3736  static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3737  // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3738  keyT *key = *(keyT**)&(as[2]);
3739  return reinterpret_cast<parsec_key_t>(key);
3740  }
3741 
3742  static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3743  if(buffer_size == 0)
3744  return buffer;
3745 
3746  if constexpr (ttg::meta::is_void_v<keyT>) {
3747  snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3748  } else {
3749  const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3750  std::stringstream ss;
3751  ss << task->key;
3752 
3753  std::string keystr = ss.str();
3754  std::replace(keystr.begin(), keystr.end(), '(', ':');
3755  std::replace(keystr.begin(), keystr.end(), ')', ':');
3756 
3757  snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3758  }
3759  return buffer;
3760  }
3761 
3762 #if defined(PARSEC_PROF_TRACE)
3763  static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3764  {
3765  const task_t *task = reinterpret_cast<const task_t *>(data);
3766 
3767  if constexpr (ttg::meta::is_void_v<keyT>) {
3768  snprintf(reinterpret_cast<char*>(dst), size, "()");
3769  } else {
3770  std::stringstream ss;
3771  ss << task->key;
3772  snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3773  }
3774  return dst;
3775  }
3776 #endif
3777 
3778  parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3779 
3780  template<std::size_t I>
3781  inline static void increment_data_version_impl(task_t *task) {
3782  if constexpr (!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3783  if (task->copies[I] != nullptr){
3784  task->copies[I]->inc_current_version();
3785  }
3786  }
3787  }
3788 
3789  template<std::size_t... Is>
3790  inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3791  /* increment version of each mutable data */
3792  int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3793  junk[0]++;
3794  }
3795 
3796  static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3797 
3798  //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3799 
3800  task_t *task = (task_t*)parsec_task;
3801 
3802 #ifdef TTG_HAVE_COROUTINE
3803  /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3804  if (task->suspended_task_address) {
3805  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3806 #ifdef TTG_HAVE_DEVICE
3807  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3808  /* increment versions of all data we might have modified
3809  * this must happen before we issue the sends */
3810  //increment_data_versions(task, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
3811 
3812  // get the device task from the coroutine handle
3813  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3814 
3815  // get the promise which contains the views
3816  auto dev_data = dev_task.promise();
3817 
3818  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3819  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3820 
3821  /* execute the sends we stored */
3822  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3823  /* set the current task, needed inside the sends */
3825  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3826  task->tt->set_outputs_tls_ptr();
3827  dev_data.do_sends(); // all sends happen here
3828  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3829  detail::parsec_ttg_caller = nullptr;
3830  }
3831  dev_task.destroy(); // safe to destroy the coroutine now
3832  }
3833 #endif // TTG_HAVE_DEVICE
3834  /* the coroutine should have completed and we cannot access the promise anymore */
3835  task->suspended_task_address = nullptr;
3836  }
3837 #endif // TTG_HAVE_COROUTINE
3838 
3839  /* release our data copies */
3840  for (int i = 0; i < task->data_count; i++) {
3841  detail::ttg_data_copy_t *copy = task->copies[i];
3842  if (nullptr == copy) continue;
3844  task->copies[i] = nullptr;
3845  }
3846 
3847  for (auto& c : task->tt->constraints_complete) {
3848  if constexpr(std::is_void_v<keyT>) {
3849  c();
3850  } else {
3851  c(task->key);
3852  }
3853  }
3854  return PARSEC_HOOK_RETURN_DONE;
3855  }
3856 
3857  public:
3858  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3859  typename priomapT = ttg::detail::default_priomap<keyT>>
3860  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3861  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3862  : ttg::TTBase(name, numinedges, numouts)
3863  , world(world)
3864  // if using default keymap, rebind to the given world
3865  , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3866  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3867  : decltype(keymap)(std::forward<keymapT>(keymap_)))
3868  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3869  // Cannot call these in base constructor since terminals not yet constructed
3870  if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3871  if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3872 
3873  auto &world_impl = world.impl();
3874  world_impl.register_op(this);
3875 
3876  if constexpr (numinedges == numins) {
3877  register_input_terminals(input_terminals, innames);
3878  } else {
3879  // create a name for the virtual control input
3880  register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3881  }
3882  register_output_terminals(output_terminals, outnames);
3883 
3884  register_input_callbacks(std::make_index_sequence<numinedges>{});
3885  int i;
3886 
3887  memset(&self, 0, sizeof(parsec_task_class_t));
3888 
3889  self.name = strdup(get_name().c_str());
3890  self.task_class_id = get_instance_id();
3891  self.nb_parameters = 0;
3892  self.nb_locals = 0;
3893  //self.nb_flows = numflows;
3894  self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3895  // trick the device handler into looking at all of them
3896 
3897  if( world_impl.profiling() ) {
3898  // first two ints are used to store the hash of the key.
3899  self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3900  // seconds two ints are used to store a pointer to the key of the task.
3901  self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3902 
3903  // If we have parameters and locals, we need to define the corresponding dereference arrays
3904  self.params[0] = &detail::parsec_taskclass_param0;
3905  self.params[1] = &detail::parsec_taskclass_param1;
3906 
3907  self.locals[0] = &detail::parsec_taskclass_param0;
3908  self.locals[1] = &detail::parsec_taskclass_param1;
3909  self.locals[2] = &detail::parsec_taskclass_param2;
3910  self.locals[3] = &detail::parsec_taskclass_param3;
3911  }
3912  self.make_key = make_key;
3913  self.key_functions = &tasks_hash_fcts;
3914  self.task_snprintf = parsec_ttg_task_snprintf;
3915 
3916 #if defined(PARSEC_PROF_TRACE)
3917  self.profile_info = &parsec_ttg_task_info;
3918 #endif
3919 
3920  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
3921  // function_id_to_instance[self.task_class_id] = this;
3922  //self.incarnations = incarnations_array.data();
3923 //#if 0
3924  if constexpr (derived_has_cuda_op()) {
3925  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3926  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
3927  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3928  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3929  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3930  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3931  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3932  } else if constexpr (derived_has_hip_op()) {
3933  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3934  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
3935  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3936  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
3937 
3938  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3939  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3940  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3941 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3942  } else if constexpr (derived_has_level_zero_op()) {
3943  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3944  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3945  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3946  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3947 
3948  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3949  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3950  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3951 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
3952  } else {
3953  self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
3954  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
3955  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
3956  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
3957  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3958  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3959  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3960  }
3961 //#endif // 0
3962 
3963  self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3964  self.complete_execution = complete_task_and_release;
3965 
3966  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3967  parsec_flow_t *flow = new parsec_flow_t;
3968  flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
3969  flow->sym_type = PARSEC_SYM_INOUT;
3970  // see initialize_flows below
3971  // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
3972  flow->dep_in[0] = NULL;
3973  flow->dep_out[0] = NULL;
3974  flow->flow_index = i;
3975  flow->flow_datatype_mask = ~0;
3976  *((parsec_flow_t **)&(self.in[i])) = flow;
3977  }
3978  //*((parsec_flow_t **)&(self.in[i])) = NULL;
3979  //initialize_flows<input_terminals_type>(self.in);
3980 
3981  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3982  parsec_flow_t *flow = new parsec_flow_t;
3983  flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
3984  flow->sym_type = PARSEC_SYM_INOUT;
3985  flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
3986  flow->dep_in[0] = NULL;
3987  flow->dep_out[0] = NULL;
3988  flow->flow_index = i;
3989  flow->flow_datatype_mask = (1 << i);
3990  *((parsec_flow_t **)&(self.out[i])) = flow;
3991  }
3992  //*((parsec_flow_t **)&(self.out[i])) = NULL;
3993 
3994  self.flags = 0;
3995  self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
3996 
3997  int nbthreads = 0;
3998  auto *context = world_impl.context();
3999  for (int i = 0; i < context->nb_vp; i++) {
4000  nbthreads += context->virtual_processes[i]->nb_cores;
4001  }
4002 
4003  parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t), sizeof(task_t),
4004  offsetof(parsec_task_t, mempool_owner), nbthreads);
4005 
4006  parsec_hash_table_init(&tasks_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4007  NULL);
4008 
4009  parsec_hash_table_init(&task_constraint_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4010  NULL);
4011  }
4012 
4013  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4014  typename priomapT = ttg::detail::default_priomap<keyT>>
4015  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4016  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4017  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
4018  std::forward<priomapT>(priomap)) {}
4019 
4020  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4021  typename priomapT = ttg::detail::default_priomap<keyT>>
4022  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4023  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
4024  keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4025  : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4026  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4027  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4028  //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
4029  if constexpr (numinedges > 0) {
4030  register_input_callbacks(std::make_index_sequence<numinedges>{});
4031  }
4032  }
4033  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4034  typename priomapT = ttg::detail::default_priomap<keyT>>
4035  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4036  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4037  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4038  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
4039  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4040 
4041  // Destructor checks for unexecuted tasks
4042  virtual ~TT() {
4043  if(nullptr != self.name ) {
4044  free((void*)self.name);
4045  self.name = nullptr;
4046  }
4047 
4048  for (std::size_t i = 0; i < numins; ++i) {
4049  if (inpute_reducers_taskclass[i] != nullptr) {
4050  std::free(inpute_reducers_taskclass[i]);
4051  inpute_reducers_taskclass[i] = nullptr;
4052  }
4053  }
4054  release();
4055  }
4056 
4057  static void ht_iter_cb(void *item, void *cb_data) {
4058  task_t *task = (task_t *)item;
4059  ttT *op = (ttT *)cb_data;
4060  if constexpr (!ttg::meta::is_void_v<keyT>) {
4061  std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
4062  } else {
4063  std::cout << "Left over task " << op->get_name() << std::endl;
4064  }
4065  }
4066 
4068  parsec_hash_table_for_all(&tasks_table, ht_iter_cb, this);
4069  }
4070 
4071  virtual void release() override { do_release(); }
4072 
4073  void do_release() {
4074  if (!alive) {
4075  return;
4076  }
4077  alive = false;
4078  /* print all outstanding tasks */
4080  parsec_hash_table_fini(&tasks_table);
4081  parsec_mempool_destruct(&mempools);
4082  // uintptr_t addr = (uintptr_t)self.incarnations;
4083  // free((void *)addr);
4084  free((__parsec_chore_t *)self.incarnations);
4085  for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4086  if (NULL != self.in[i]) {
4087  free(self.in[i]->name);
4088  delete self.in[i];
4089  self.in[i] = nullptr;
4090  }
4091  if (NULL != self.out[i]) {
4092  free(self.out[i]->name);
4093  delete self.out[i];
4094  self.out[i] = nullptr;
4095  }
4096  }
4097  world.impl().deregister_op(this);
4098  }
4099 
4100  static constexpr const ttg::Runtime runtime = ttg::Runtime::PaRSEC;
4101 
4107  template <std::size_t i, typename Reducer>
4108  void set_input_reducer(Reducer &&reducer) {
4109  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4110  std::get<i>(input_reducers) = reducer;
4111 
4112  parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4113  if (nullptr == tc) {
4114  tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4115  inpute_reducers_taskclass[i] = tc;
4116 
4117  tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4118  tc->task_class_id = get_instance_id();
4119  tc->nb_parameters = 0;
4120  tc->nb_locals = 0;
4121  tc->nb_flows = numflows;
4122 
4123  auto &world_impl = world.impl();
4124 
4125  if( world_impl.profiling() ) {
4126  // first two ints are used to store the hash of the key.
4127  tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4128  // seconds two ints are used to store a pointer to the key of the task.
4129  tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4130 
4131  // If we have parameters and locals, we need to define the corresponding dereference arrays
4132  tc->params[0] = &detail::parsec_taskclass_param0;
4133  tc->params[1] = &detail::parsec_taskclass_param1;
4134 
4135  tc->locals[0] = &detail::parsec_taskclass_param0;
4136  tc->locals[1] = &detail::parsec_taskclass_param1;
4137  tc->locals[2] = &detail::parsec_taskclass_param2;
4138  tc->locals[3] = &detail::parsec_taskclass_param3;
4139  }
4140  tc->make_key = make_key;
4141  tc->key_functions = &tasks_hash_fcts;
4142  tc->task_snprintf = parsec_ttg_task_snprintf;
4143 
4144 #if defined(PARSEC_PROF_TRACE)
4145  tc->profile_info = &parsec_ttg_task_info;
4146 #endif
4147 
4148  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4149 
4150 #if 0
4151  // FIXME: currently only support reduction on the host
4152  if constexpr (derived_has_cuda_op()) {
4153  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4154  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4155  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4156  ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4157  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4158  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4159  ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4160  ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4161  ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4162  ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4163  } else
4164 #endif // 0
4165  {
4166  tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4167  ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4168  ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4169  ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4170  ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4171  ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4172  ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4173  }
4174 
4175  /* the reduction task does not alter the termination detection because the target task will execute */
4176  tc->release_task = &parsec_release_task_to_mempool;
4177  tc->complete_execution = NULL;
4178  }
4179  }
4180 
4188  template <std::size_t i, typename Reducer>
4189  void set_input_reducer(Reducer &&reducer, std::size_t size) {
4190  set_input_reducer<i>(std::forward<Reducer>(reducer));
4191  set_static_argstream_size<i>(size);
4192  }
4193 
4194  // Returns reference to input terminal i to facilitate connection --- terminal
4195  // cannot be copied, moved or assigned
4196  template <std::size_t i>
4197  std::tuple_element_t<i, input_terminals_type> *in() {
4198  return &std::get<i>(input_terminals);
4199  }
4200 
4201  // Returns reference to output terminal for purpose of connection --- terminal
4202  // cannot be copied, moved or assigned
4203  template <std::size_t i>
4204  std::tuple_element_t<i, output_terminalsT> *out() {
4205  return &std::get<i>(output_terminals);
4206  }
4207 
4208  // Manual injection of a task with all input arguments specified as a tuple
4209  template <typename Key = keyT>
4210  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4211  const Key &key, const input_values_tuple_type &args) {
4213  if constexpr(!std::is_same_v<Key, key_type>) {
4214  key_type k = key; /* cast that type into the key type we know */
4215  invoke(k, args);
4216  } else {
4217  /* trigger non-void inputs */
4218  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4219  /* trigger void inputs */
4220  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4221  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4222  }
4223  }
4224 
4225  // Manual injection of a key-free task and all input arguments specified as a tuple
4226  template <typename Key = keyT>
4227  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4228  const input_values_tuple_type &args) {
4230  /* trigger non-void inputs */
4231  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4232  /* trigger void inputs */
4233  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4234  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4235  }
4236 
4237  // Manual injection of a task that has no arguments
4238  template <typename Key = keyT>
4239  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4240  const Key &key) {
4242 
4243  if constexpr(!std::is_same_v<Key, key_type>) {
4244  key_type k = key; /* cast that type into the key type we know */
4245  invoke(k);
4246  } else {
4247  /* trigger void inputs */
4248  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4249  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4250  }
4251  }
4252 
4253  // Manual injection of a task that has no key or arguments
4254  template <typename Key = keyT>
4255  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4257  /* trigger void inputs */
4258  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4259  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4260  }
4261 
4262  // overrides TTBase::invoke()
4263  void invoke() override {
4264  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4265  invoke<keyT>();
4266  else
4267  TTBase::invoke();
4268  }
4269 
4270  private:
4271  template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4272  void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4273  using arg_type = std::decay_t<Arg>;
4274  if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4275  /* add a reference to the object */
4276  auto copy = ttg_parsec::detail::get_copy(arg);
4277  copy->add_ref();
4278  /* reset readers so that the value can flow without copying */
4279  copy->reset_readers();
4280  auto& val = *arg;
4281  set_arg_impl<I>(key, val, copy);
4283  if constexpr (std::is_rvalue_reference_v<Arg>) {
4284  /* if the ptr was moved in we reset it */
4285  arg.reset();
4286  }
4287  } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4288  set_arg<I>(key, std::forward<Arg>(arg));
4289  }
4290  if constexpr (sizeof...(Is) > 0) {
4291  /* recursive next argument */
4292  invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4293  }
4294  }
4295 
4296  public:
4297  // Manual injection of a task with all input arguments specified as variadic arguments
4298  template <typename Key = keyT, typename Arg, typename... Args>
4299  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4300  const Key &key, Arg&& arg, Args&&... args) {
4301  static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4302  "Number of arguments to invoke must match the number of task inputs.");
4304  /* trigger non-void inputs */
4305  invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4306  std::forward<Arg>(arg), std::forward<Args>(args)...);
4307  //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4308  /* trigger void inputs */
4309  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4310  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4311  }
4312 
4313  void set_defer_writer(bool value) {
4314  m_defer_writer = value;
4315  }
4316 
4317  bool get_defer_writer(bool value) {
4318  return m_defer_writer;
4319  }
4320 
4321  public:
4322  void make_executable() override {
4323  world.impl().register_tt_profiling(this);
4326  }
4327 
4330  const decltype(keymap) &get_keymap() const { return keymap; }
4331 
4333  template <typename Keymap>
4334  void set_keymap(Keymap &&km) {
4335  keymap = km;
4336  }
4337 
4340  const decltype(priomap) &get_priomap() const { return priomap; }
4341 
4344  template <typename Priomap>
4345  void set_priomap(Priomap &&pm) {
4346  priomap = std::forward<Priomap>(pm);
4347  }
4348 
4354  template<typename Devicemap>
4355  void set_devicemap(Devicemap&& dm) {
4356  static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4357  if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4358  // dm returns a Device
4359  devicemap = std::forward<Devicemap>(dm);
4360  } else {
4361  // convert dm return into a Device
4362  devicemap = [=](const keyT& key) {
4363  if constexpr (derived_has_cuda_op()) {
4365  } else if constexpr (derived_has_hip_op()) {
4367  } else if constexpr (derived_has_level_zero_op()) {
4369  } else {
4370  throw std::runtime_error("Unknown device type!");
4371  return ttg::device::Device{};
4372  }
4373  };
4374  }
4375  }
4376 
4379  auto get_devicemap() { return devicemap; }
4380 
4383  template<typename Constraint>
4384  void add_constraint(std::shared_ptr<Constraint> c) {
4385  std::size_t cid = constraints_check.size();
4386  if constexpr(ttg::meta::is_void_v<keyT>) {
4387  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4388  constraints_check.push_back([c, this](){ return c->check(this); });
4389  constraints_complete.push_back([c, this](const keyT& key){ c->complete(this); return true; });
4390  } else {
4391  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4392  constraints_check.push_back([c, this](const keyT& key){ return c->check(key, this); });
4393  constraints_complete.push_back([c, this](const keyT& key){ c->complete(key, this); return true; });
4394  }
4395  }
4396 
4399  template<typename Constraint>
4400  void add_constraint(Constraint&& c) {
4401  // need to make this a shared_ptr since it's shared between different callbacks
4402  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4403  }
4404 
4408  template<typename Constraint, typename Mapper>
4409  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
4410  static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4411  std::size_t cid = constraints_check.size();
4412  if constexpr(ttg::meta::is_void_v<keyT>) {
4413  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4414  constraints_check.push_back([map, c, this](){ return c->check(map(), this); });
4415  constraints_complete.push_back([map, c, this](){ c->complete(map(), this); return true; });
4416  } else {
4417  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4418  constraints_check.push_back([map, c, this](const keyT& key){ return c->check(key, map(key), this); });
4419  constraints_complete.push_back([map, c, this](const keyT& key){ c->complete(key, map(key), this); return true; });
4420  }
4421  }
4422 
4426  template<typename Constraint, typename Mapper>
4427  void add_constraint(Constraint c, Mapper&& map) {
4428  // need to make this a shared_ptr since it's shared between different callbacks
4429  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4430  }
4431 
4432  // Register the static_op function to associate it to instance_id
4434  int rank;
4435  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
4436  ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4437  static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4438  auto &world_impl = world.impl();
4439  static_map_mutex.lock();
4440  static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4441  if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4442  auto tp = world_impl.taskpool();
4443 
4444  ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4445  " messages delayed with op_id ", get_instance_id());
4446 
4447  auto se = delayed_unpack_actions.equal_range(get_instance_id());
4448  std::vector<static_set_arg_fct_arg_t> tmp;
4449  for (auto it = se.first; it != se.second;) {
4450  assert(it->first == get_instance_id());
4451  tmp.push_back(it->second);
4452  it = delayed_unpack_actions.erase(it);
4453  }
4454  static_map_mutex.unlock();
4455 
4456  for (auto it : tmp) {
4457  if(ttg::tracing())
4458  ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4459  std::get<1>(it), ", ", std::get<2>(it), ")");
4460  int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4461  std::get<0>(it), NULL);
4462  assert(rc == 0);
4463  free(std::get<1>(it));
4464  }
4465 
4466  tmp.clear();
4467  } else {
4468  static_map_mutex.unlock();
4469  }
4470  }
4471  };
4472 
4473 #include "ttg/make_tt.h"
4474 
4475 } // namespace ttg_parsec
4476 
4482 template <>
4484  private:
4485  ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4486  bool do_release = true;
4487 
4488  public:
4489  value_copy_handler() = default;
4492  : copy_to_remove(h.copy_to_remove)
4493  {
4494  h.copy_to_remove = nullptr;
4495  }
4496 
4499  {
4500  std::swap(copy_to_remove, h.copy_to_remove);
4501  return *this;
4502  }
4503 
4505  if (nullptr != copy_to_remove) {
4507  if (do_release) {
4508  ttg_parsec::detail::release_data_copy(copy_to_remove);
4509  }
4510  }
4511  }
4512 
4513  template <typename Value>
4514  inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4515  constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4516  using value_type = std::remove_reference_t<Value>;
4517  static_assert(value_is_rvref ||
4518  std::is_copy_constructible_v<std::decay_t<Value>>,
4519  "Data sent without being moved must be copy-constructible!");
4520 
4522  if (nullptr == caller) {
4523  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4524  }
4525 
4527  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4528  value_type *value_ptr = &value;
4529  if (nullptr == copy) {
4534  copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4535  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4536  assert(inserted);
4537  value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4538  copy_to_remove = copy;
4539  } else {
4540  if constexpr (value_is_rvref) {
4541  /* this copy won't be modified anymore so mark it as read-only */
4542  copy->reset_readers();
4543  }
4544  }
4545  /* We're coming from a writer so mark the data as modified.
4546  * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4548  if constexpr (value_is_rvref)
4549  return std::move(*value_ptr);
4550  else
4551  return *value_ptr;
4552  }
4553 
4554  template<typename Value>
4555  inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4557  if (nullptr == caller) {
4558  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4559  }
4561  copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4562  if (nullptr == copy) {
4563  // no need to create a new copy since it's derived from the copy already
4564  copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4565  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4566  assert(inserted);
4567  copy_to_remove = copy; // we want to remove the copy from the task once done sending
4568  do_release = true; // we don't release the copy since we didn't allocate it
4569  copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4570  copy->add_ref(); // add another reference so that TTG never attempts to free this copy
4571  if (copy->num_readers() == 0) {
4572  /* add at least one reader (the current task) */
4573  copy->increment_readers<false>();
4574  }
4575  }
4576  return vref.value_ref;
4577  }
4578 
4579  template <typename Value>
4580  inline const Value &operator()(const Value &value) {
4582  if (nullptr == caller) {
4583  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4584  }
4586  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4587  const Value *value_ptr = &value;
4588  if (nullptr == copy) {
4594  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4595  assert(inserted);
4596  value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4597  copy_to_remove = copy;
4598  }
4600  return *value_ptr;
4601  }
4602 
4603 };
4604 
4605 #endif // PARSEC_TTG_H_INCLUDED
4606 // clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:277
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:31
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition: tt.h:187
auto get_instance_id() const
Definition: tt.h:259
virtual void make_executable()=0
Marks this executable.
Definition: tt.h:287
bool tracing() const
Definition: tt.h:178
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:218
TTBase(TTBase &&other)
Definition: tt.h:116
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:85
bool is_lazy_pull()
Definition: tt.h:200
const TTBase * ttg_ptr() const
Definition: tt.h:206
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:92
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
Represents a device in a specific execution space.
Definition: device.h:14
void print_incomplete_tasks()
Definition: ttg.h:4067
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:4427
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4035
ttg::World get_world() const override final
Definition: ttg.h:1341
static constexpr int numinvals
Definition: ttg.h:1253
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition: ttg.h:1919
void set_priomap(Priomap &&pm)
Definition: ttg.h:4345
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:4108
std::tuple_element_t< i, input_terminals_type > * in()
Definition: ttg.h:4197
void set_devicemap(Devicemap &&dm)
Definition: ttg.h:4355
void set_keymap(Keymap &&km)
keymap setter
Definition: ttg.h:4334
void add_constraint(std::shared_ptr< Constraint > c)
Definition: ttg.h:4384
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:1221
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition: ttg.h:2225
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:4189
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition: ttg.h:2924
bool check_constraints(task_t *task)
Definition: ttg.h:2568
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition: ttg.h:3447
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:1244
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:2954
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition: ttg.h:2698
task_t * create_new_task(const Key &key)
Definition: ttg.h:2311
virtual ~TT()
Definition: ttg.h:4042
output_terminalsT output_terminals_type
Definition: ttg.h:1257
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:1242
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:4409
static auto & get(InTuple &&intuple)
Definition: ttg.h:1265
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition: ttg.h:2710
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition: ttg.h:4210
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition: ttg.h:4255
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4015
void set_defer_writer(bool value)
Definition: ttg.h:4313
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition: ttg.h:2721
virtual void release() override
Definition: ttg.h:4071
bool get_defer_writer(bool value)
Definition: ttg.h:4317
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition: ttg.h:2716
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3132
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition: ttg.h:2287
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:3153
static void ht_iter_cb(void *item, void *cb_data)
Definition: ttg.h:4057
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3144
parsec_thread_mempool_t * get_task_mempool(void)
Definition: ttg.h:1979
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:3192
void do_release()
Definition: ttg.h:4073
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2645
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition: ttg.h:3253
const auto & get_output_terminals() const
Definition: ttg.h:1282
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition: ttg.h:4227
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:1231
auto get_devicemap()
Definition: ttg.h:4379
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:3860
decltype(keymap) const & get_keymap() const
Definition: ttg.h:4330
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:1250
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:3363
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:3543
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
Definition: ttg.h:3410
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition: ttg.h:2338
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition: ttg.h:4022
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition: ttg.h:1906
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:3173
static constexpr const ttg::Runtime runtime
Definition: ttg.h:4100
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition: ttg.h:1934
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition: ttg.h:1987
static resultT get(InTuple &&intuple)
Definition: ttg.h:1261
void register_static_op_function(void)
Definition: ttg.h:4433
void add_constraint(Constraint &&c)
Definition: ttg.h:4400
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:3164
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:1258
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition: ttg.h:2244
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition: ttg.h:2299
void set_arg_from_msg(void *data, std::size_t size)
Definition: ttg.h:2048
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition: ttg.h:2305
keyT key_type
Definition: ttg.h:1241
void invoke() override
Definition: ttg.h:4263
void make_executable() override
Marks this executable.
Definition: ttg.h:4322
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
Definition: ttg.h:2609
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:1249
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:1251
static constexpr bool derived_has_hip_op()
Definition: ttg.h:1226
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:1247
decltype(priomap) const & get_priomap() const
Definition: ttg.h:4340
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition: ttg.h:4299
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:3311
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition: ttg.h:4239
std::tuple_element_t< i, output_terminalsT > * out()
Definition: ttg.h:4204
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2365
static constexpr bool derived_has_device_op()
Definition: ttg.h:1236
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition: ttg.h:2293
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition: ttg.h:2281
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
Definition: ttg.h:2585
void get_from_pull_msg(void *data, std::size_t size)
Definition: ttg.h:2267
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition: ttg.h:2751
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:2705
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition: ttg.h:3557
actual_input_tuple_type input_args_type
Definition: ttg.h:1243
void increment_created()
Definition: ttg.h:456
virtual void execute() override
Definition: ttg.h:397
static constexpr int parsec_ttg_rma_tag()
Definition: ttg.h:393
void decrement_inflight_msg()
Definition: ttg.h:459
WorldImpl & operator=(const WorldImpl &other)=delete
void destroy_tpool()
Definition: ttg.h:408
const ttg::Edge & ctl_edge() const
Definition: ttg.h:454
void increment_inflight_msg()
Definition: ttg.h:458
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
Definition: ttg.h:488
void create_tpool()
Definition: ttg.h:335
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition: ttg.h:273
ttg::Edge & ctl_edge()
Definition: ttg.h:452
MPI_Comm comm() const
Definition: ttg.h:395
bool mpi_support(ttg::ExecutionSpace space)
Definition: ttg.h:502
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
Definition: ttg.h:517
virtual bool profiling() override
Definition: ttg.h:500
virtual void dag_off() override
Definition: ttg.h:479
virtual void fence_impl(void) override
Definition: ttg.h:556
virtual void dag_on(const std::string &filename) override
Definition: ttg.h:463
static constexpr int parsec_ttg_tag()
Definition: ttg.h:392
virtual void final_task() override
Definition: ttg.h:506
auto * taskpool()
Definition: ttg.h:333
virtual void destroy() override
Definition: ttg.h:421
virtual void profile_on() override
Definition: ttg.h:494
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition: ttg.h:461
auto * execution_stream()
Definition: ttg.h:332
auto * context()
Definition: ttg.h:331
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
Definition: ttg.h:844
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition: env.cpp:33
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
void set_current(int device, Stream stream)
Definition: device.h:128
void reset_current()
Definition: device.h:123
constexpr bool is_const_v
Definition: meta.h:325
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:633
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition: ptr.h:277
bool & initialized_mpi()
Definition: ttg.h:226
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
Definition: ttg.h:826
bool all_devices_peer_access
Definition: ttg.h:231
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:794
int first_device_id
Definition: device.h:12
std::size_t max_inline_size
Definition: ttg.h:192
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:648
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
const parsec_symbol_t parsec_taskclass_param1
Definition: ttg.h:608
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
Definition: ttg.h:806
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:662
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition: ttg.h:174
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:676
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:763
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
Definition: ttg.h:941
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
Definition: ttg.h:816
const parsec_symbol_t parsec_taskclass_param2
Definition: ttg.h:616
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:783
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition: ttg.h:704
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:772
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
Definition: thread_local.h:12
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
Definition: ttg.h:756
void release_data_copy(ttg_data_copy_t *copy)
Definition: ttg.h:888
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
Definition: ttg.h:748
const parsec_symbol_t parsec_taskclass_param3
Definition: ttg.h:624
const parsec_symbol_t parsec_taskclass_param0
Definition: ttg.h:600
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
void ttg_fence(ttg::World world)
Definition: ttg.h:1119
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
Definition: ttg.h:138
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition: ttg.h:136
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition: ttg.h:139
void ttg_finalize()
Definition: ttg.h:1105
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:1122
std::mutex static_map_mutex
Definition: ttg.h:137
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:1136
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:1140
void make_executable_hook(ttg::World &)
Definition: ttg.h:1148
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition: ttg.h:134
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:1131
ttg::World ttg_default_execution_context()
Definition: ttg.h:1115
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition: ttg.h:135
void ttg_execute(ttg::World world)
Definition: ttg.h:1118
void ttg_sum(ttg::World world, double &value)
Definition: ttg.h:1142
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
ExecutionSpace
denotes task execution space
Definition: execution.h:17
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
Runtime
Definition: runtimes.h:15
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
bool tracing()
returns whether tracing is enabled
Definition: trace.h:28
int rank(World world=default_execution_context())
Definition: run.h:85
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Definition: parsec-ext.h:8
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition: ttg.h:4580
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition: ttg.h:4514
value_copy_handler & operator=(value_copy_handler &&h)
Definition: ttg.h:4498
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition: ttg.h:4555
value_copy_handler & operator=(const value_copy_handler &h)=delete
A container for types.
Definition: typelist.h:24
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
parsec_hash_table_t task_constraint_table
Definition: ttg.h:1179
parsec_hash_table_t tasks_table
Definition: ttg.h:1178
parsec_gpu_task_t * gpu_task
Definition: task.h:14
msg_header_t tt_id
Definition: ttg.h:177
static constexpr std::size_t max_payload_size
Definition: ttg.h:178
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition: ttg.h:182
unsigned char bytes[max_payload_size]
Definition: ttg.h:179
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
std::array< stream_info_t, num_streams > streams
Definition: task.h:210
ttg_data_copy_t * copies[num_copies]
Definition: task.h:216
lvalue_reference_type value_ref
Definition: ttvalue.h:88
static void drop_all_ptr()
Definition: ptr.h:117
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:359
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
void transfer_ownership(int access, int device=0)
void set_next_task(parsec_task_t *task)
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition: ttg.h:148
std::int8_t num_iovecs
Definition: ttg.h:152
std::size_t key_offset
Definition: ttg.h:150
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition: ttg.h:160
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13