ttg.h
Go to the documentation of this file.
1 // clang-format off
2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
4 
5 /* set up env if this header was included directly */
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
8 #endif // !defined(TTG_IMPL_NAME)
9 
10 /* Whether to defer a potential writer if there are readers.
11  * This may avoid extra copies in exchange for concurrency.
12  * This may cause deadlocks, so use with caution. */
13 #define TTG_PARSEC_DEFER_WRITER false
14 
15 #include "ttg/config.h"
16 
17 #include "ttg/impl_selector.h"
18 
19 /* include ttg header to make symbols available in case this header is included directly */
20 #include "../../ttg.h"
21 
22 #include "ttg/base/keymap.h"
23 #include "ttg/base/tt.h"
24 #include "ttg/base/world.h"
25 #include "ttg/constraint.h"
26 #include "ttg/edge.h"
27 #include "ttg/execution.h"
28 #include "ttg/func.h"
29 #include "ttg/runtimes.h"
30 #include "ttg/terminal.h"
31 #include "ttg/tt.h"
32 #include "ttg/util/env.h"
33 #include "ttg/util/hash.h"
34 #include "ttg/util/meta.h"
35 #include "ttg/util/meta/callable.h"
36 #include "ttg/util/print.h"
37 #include "ttg/util/scope_exit.h"
38 #include "ttg/util/trace.h"
39 #include "ttg/util/typelist.h"
40 
42 
43 #include "ttg/parsec/fwd.h"
44 
45 #include "ttg/parsec/buffer.h"
48 #include "ttg/parsec/devicefunc.h"
49 #include "ttg/parsec/ttvalue.h"
50 #include "ttg/device/task.h"
51 #include "ttg/parsec/parsec_data.h"
52 
53 #include <algorithm>
54 #include <array>
55 #include <cassert>
56 #include <cstring>
57 #include <experimental/type_traits>
58 #include <functional>
59 #include <future>
60 #include <iostream>
61 #include <list>
62 #include <map>
63 #include <memory>
64 #include <mutex>
65 #include <numeric>
66 #include <sstream>
67 #include <string>
68 #include <tuple>
69 #include <vector>
70 
71 // needed for MPIX_CUDA_AWARE_SUPPORT
72 #if defined(TTG_HAVE_MPI)
73 #include <mpi.h>
74 #if defined(TTG_HAVE_MPIEXT)
75 #include <mpi-ext.h>
76 #endif // TTG_HAVE_MPIEXT
77 #endif // TTG_HAVE_MPI
78 
79 
80 #include <parsec.h>
81 #include <parsec/class/parsec_hash_table.h>
82 #include <parsec/data_internal.h>
83 #include <parsec/execution_stream.h>
84 #include <parsec/interfaces/interface.h>
85 #include <parsec/mca/device/device.h>
86 #include <parsec/parsec_comm_engine.h>
87 #include <parsec/parsec_internal.h>
88 #include <parsec/scheduling.h>
89 #include <parsec/remote_dep.h>
90 #include <parsec/utils/mca_param.h>
91 
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
94 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
97 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
100 #endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
101 
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
108 #endif
109 #endif
110 #include <cstdlib>
111 #include <cstring>
112 
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
115 #endif
116 
118 #include "ttg/parsec/thread_local.h"
119 #include "ttg/parsec/ptr.h"
120 #include "ttg/parsec/task.h"
121 #include "ttg/parsec/parsec-ext.h"
122 
123 #include "ttg/device/device.h"
124 
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
126 
127 /* PaRSEC function declarations */
128 extern "C" {
129 void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
130 int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
131 }
132 
133 namespace ttg_parsec {
134  typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
135  typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
136  inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
137  inline std::mutex static_map_mutex;
138  typedef std::tuple<int, void *, size_t> static_set_arg_fct_arg_t;
139  inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
140 
141  struct msg_header_t {
142  typedef enum fn_id : std::int8_t {
148  uint32_t taskpool_id = std::numeric_limits<uint32_t>::max();
149  uint64_t op_id = std::numeric_limits<uint64_t>::max();
150  std::size_t key_offset = 0;
152  std::int8_t num_iovecs = 0;
153  bool inline_data = false;
154  int32_t param_id = -1;
155  int num_keys = 0;
156  int sender = -1;
157 
158  msg_header_t() = default;
159 
160  msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
161  : fn_id(fid)
162  , taskpool_id(tid)
163  , op_id(oid)
164  , param_id(pid)
165  , num_keys(nk)
166  , sender(sender)
167  { }
168  };
169 
170  static void unregister_parsec_tags(void *_);
171 
172  namespace detail {
173 
174  constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
175 
176  struct msg_t {
178  static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
179  unsigned char bytes[max_payload_size];
180 
181  msg_t() = default;
182  msg_t(uint64_t tt_id,
183  uint32_t taskpool_id,
184  msg_header_t::fn_id_t fn_id,
185  int32_t param_id,
186  int sender,
187  int num_keys = 1)
188  : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
189  {}
190  };
191 
193 
194  static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
195  int src_rank, void *obj) {
196  static_set_arg_fct_type static_set_arg_fct;
197  parsec_taskpool_t *tp = NULL;
198  msg_header_t *msg = static_cast<msg_header_t *>(data);
199  uint64_t op_id = msg->op_id;
200  tp = parsec_taskpool_lookup(msg->taskpool_id);
201  assert(NULL != tp);
202  static_map_mutex.lock();
203  try {
204  auto op_pair = static_id_to_op_map.at(op_id);
205  static_map_mutex.unlock();
206  tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207  static_set_arg_fct = op_pair.first;
208  static_set_arg_fct(data, size, op_pair.second);
209  tp->tdm.module->incoming_message_end(tp, NULL);
210  return 0;
211  } catch (const std::out_of_range &e) {
212  void *data_cpy = malloc(size);
213  assert(data_cpy != 0);
214  memcpy(data_cpy, data, size);
215  ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
216  ", ", op_id, ", ", data_cpy, ", ", size, ")");
217  delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, data_cpy, size)));
218  static_map_mutex.unlock();
219  return 1;
220  }
221  }
222 
223  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
224  int src, void *cb_data);
225 
226  inline bool &initialized_mpi() {
227  static bool im = false;
228  return im;
229  }
230 
232 
233  } // namespace detail
234 
236  ttg::Edge<> m_ctl_edge;
237  bool _dag_profiling;
238  bool _task_profiling;
239  std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
240  mpi_space_support = {true, false, false};
241 
242  int query_comm_size() {
243  int comm_size;
244  MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
245  return comm_size;
246  }
247 
248  int query_comm_rank() {
249  int comm_rank;
250  MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
251  return comm_rank;
252  }
253 
254  static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
255  {
256  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
257  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
258  }
259 
260  static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
261  {
262  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
263  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
264  }
265 
266  public:
267 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
268  int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
269  int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
270  int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
271 #endif
272 
273  WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
274  : WorldImplBase(query_comm_size(), query_comm_rank())
275  , ctx(c)
276  , own_ctx(c == nullptr)
277 #if defined(PARSEC_PROF_TRACE)
278  , profiling_array(nullptr)
279  , profiling_array_size(0)
280 #endif
281  , _dag_profiling(false)
282  , _task_profiling(false)
283  {
285  if (own_ctx) ctx = parsec_init(ncores, argc, argv);
286 
287  /* query MPI device support */
289 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
290  || MPIX_Query_cuda_support()
291 #endif // MPIX_CUDA_AWARE_SUPPORT
292  ) {
293  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
294  }
295 
297 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
298  || MPIX_Query_hip_support()
299 #endif // MPIX_HIP_AWARE_SUPPORT
300  ) {
301  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
302  }
303 
304 #if defined(PARSEC_PROF_TRACE)
305  if(parsec_profile_enabled) {
306  profile_on();
307 #if defined(PARSEC_TTG_PROFILE_BACKEND)
308  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
309  (int*)&parsec_ttg_profile_backend_set_arg_start,
310  (int*)&parsec_ttg_profile_backend_set_arg_end);
311  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
312  (int*)&parsec_ttg_profile_backend_bcast_arg_start,
313  (int*)&parsec_ttg_profile_backend_bcast_arg_end);
314  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
315  sizeof(size_t), "size{int64_t}",
316  (int*)&parsec_ttg_profile_backend_allocate_datacopy,
317  (int*)&parsec_ttg_profile_backend_free_datacopy);
318 #endif
319  }
320 #endif
321 
322 #ifdef PARSEC_PROF_GRAPHER
323  /* check if PaRSEC's dot tracing is enabled */
324  int dot_param_idx;
325  dot_param_idx = parsec_mca_param_find("profile", NULL, "dot");
326 
327  if (dot_param_idx != PARSEC_ERROR) {
328  char *filename;
329  parsec_mca_param_lookup_string(dot_param_idx, &filename);
330  dag_on(filename);
331  }
332 #endif // PARSEC_PROF_GRAPHER
333 
334  if( NULL != parsec_ce.tag_register) {
335  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
336  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
337  }
338 
339  create_tpool();
340  }
341 
342 
343  auto *context() { return ctx; }
344  auto *execution_stream() { return parsec_my_execution_stream(); }
345  auto *taskpool() { return tpool; }
346 
347  void create_tpool() {
348  assert(nullptr == tpool);
349  tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
350  tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
351  tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
352  tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
353  tpool->taskpool_name = strdup("TTG Taskpool");
354  parsec_taskpool_reserve_id(tpool);
355 
356  tpool->devices_index_mask = 0;
357  for(int i = 0; i < (int)parsec_nb_devices; i++) {
358  parsec_device_module_t *device = parsec_mca_device_get(i);
359  if( NULL == device ) continue;
360  tpool->devices_index_mask |= (1 << device->device_index);
361  }
362 
363 #ifdef TTG_USE_USER_TERMDET
364  parsec_termdet_open_module(tpool, "user_trigger");
365 #else // TTG_USE_USER_TERMDET
366  parsec_termdet_open_dyn_module(tpool);
367 #endif // TTG_USE_USER_TERMDET
368  tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
369  // In TTG, we use the pending actions to denote that the
370  // taskpool is not ready, i.e. some local tasks could still
371  // be added by the main thread. It should then be initialized
372  // to 0, execute will set it to 1 and mark the tpool as ready,
373  // and the fence() will decrease it back to 0.
374  tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
375  parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
376 
377 #if defined(PARSEC_PROF_TRACE)
378  tpool->profiling_array = profiling_array;
379 #endif
380 
381  // Termination detection in PaRSEC requires to synchronize the
382  // taskpool enabling, to avoid a race condition that would keep
383  // termination detection-related messages in a waiting queue
384  // forever
385  MPI_Barrier(comm());
386 
387  parsec_taskpool_started = false;
388  }
389 
390  /* Deleted copy ctor */
391  WorldImpl(const WorldImpl &other) = delete;
392 
393  /* Deleted move ctor */
394  WorldImpl(WorldImpl &&other) = delete;
395 
396  /* Deleted copy assignment */
397  WorldImpl &operator=(const WorldImpl &other) = delete;
398 
399  /* Deleted move assignment */
400  WorldImpl &operator=(WorldImpl &&other) = delete;
401 
403 
404  static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
405  static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
406 
407  MPI_Comm comm() const { return MPI_COMM_WORLD; }
408 
409  virtual void execute() override {
410  if (!parsec_taskpool_started) {
411  parsec_enqueue(ctx, tpool);
412  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
413  tpool->tdm.module->taskpool_ready(tpool);
414  [[maybe_unused]] auto ret = parsec_context_start(ctx);
415  // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
416  parsec_taskpool_started = true;
417  }
418  }
419 
420  void destroy_tpool() {
421 #if defined(PARSEC_PROF_TRACE)
422  // We don't want to release the profiling array, as it should be persistent
423  // between fences() to allow defining a TT/TTG before a fence() and schedule
424  // it / complete it after a fence()
425  tpool->profiling_array = nullptr;
426 #endif
427  assert(NULL != tpool->tdm.monitor);
428  tpool->tdm.module->unmonitor_taskpool(tpool);
429  parsec_taskpool_free(tpool);
430  tpool = nullptr;
431  }
432 
433  virtual void destroy() override {
434  if (is_valid()) {
435  if (parsec_taskpool_started) {
436  // We are locally ready (i.e. we won't add new tasks)
437  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
438  ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
439  if (own_ctx)
440  parsec_context_wait(ctx);
441  else
442  parsec_taskpool_wait(tpool);
443  }
444  release_ops();
446  destroy_tpool();
447  if (own_ctx) {
448  unregister_parsec_tags(nullptr);
449  } else {
450  parsec_context_at_fini(unregister_parsec_tags, nullptr);
451  }
452 #if defined(PARSEC_PROF_TRACE)
453  if(nullptr != profiling_array) {
454  free(profiling_array);
455  profiling_array = nullptr;
456  profiling_array_size = 0;
457  }
458 #endif
459  if (own_ctx) parsec_fini(&ctx);
460  mark_invalid();
461  }
462  }
463 
464  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
465 
466  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
467 
468  void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
469 
470  void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
471  void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
472 
473  bool dag_profiling() override { return _dag_profiling; }
474 
475  virtual void dag_on(const std::string &filename) override {
476 #if defined(PARSEC_PROF_GRAPHER)
477  if(!_dag_profiling) {
478  profile_on();
479  size_t len = strlen(filename.c_str())+32;
480  char ext_filename[len];
481  snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
482  parsec_prof_grapher_init(ctx, ext_filename);
483  _dag_profiling = true;
484  }
485 #else
486  ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
487  "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
488 #endif
489  }
490 
491  virtual void dag_off() override {
492 #if defined(PARSEC_PROF_GRAPHER)
493  if(_dag_profiling) {
494  parsec_prof_grapher_fini();
495  _dag_profiling = false;
496  }
497 #endif
498  }
499 
500  virtual void profile_off() override {
501 #if defined(PARSEC_PROF_TRACE)
502  _task_profiling = false;
503 #endif
504  }
505 
506  virtual void profile_on() override {
507 #if defined(PARSEC_PROF_TRACE)
508  _task_profiling = true;
509 #endif
510  }
511 
512  virtual bool profiling() override { return _task_profiling; }
513 
515  return mpi_space_support[static_cast<std::size_t>(space)];
516  }
517 
518  virtual void final_task() override {
519 #ifdef TTG_USE_USER_TERMDET
520  if(parsec_taskpool_started) {
521  taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
522  parsec_taskpool_started = false;
523  }
524 #endif // TTG_USE_USER_TERMDET
525  }
526 
527  template <typename keyT, typename output_terminalsT, typename derivedT,
528  typename input_valueTs = ttg::typelist<>, ttg::ExecutionSpace Space>
530 #if defined(PARSEC_PROF_TRACE)
531  std::stringstream ss;
532  build_composite_name_rec(t->ttg_ptr(), ss);
533  ss << t->get_name();
534  register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
535 #endif
536  }
537 
538  protected:
539 #if defined(PARSEC_PROF_TRACE)
540  void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
541  if(nullptr == t)
542  return;
543  build_composite_name_rec(t->ttg_ptr(), ss);
544  ss << t->get_name() << "::";
545  }
546 
547  void register_new_profiling_event(const char *name, int position) {
548  if(2*position >= profiling_array_size) {
549  size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
550  profiling_array = (int*)realloc((void*)profiling_array,
551  new_profiling_array_size * sizeof(int));
552  memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
553  profiling_array_size = new_profiling_array_size;
554  tpool->profiling_array = profiling_array;
555  }
556 
557  assert(0 == tpool->profiling_array[2*position]);
558  assert(0 == tpool->profiling_array[2*position+1]);
559  // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
560  // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
561  // there are three fields, named m, n and k, stored in this order, and each of size int32_t
562  parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
563  (int*)&tpool->profiling_array[2*position],
564  (int*)&tpool->profiling_array[2*position+1]);
565  }
566 #endif
567 
568  virtual void fence_impl(void) override {
569  int rank = this->rank();
570  if (!parsec_taskpool_started) {
571  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
572  MPI_Barrier(comm());
573  return;
574  }
575  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
576  // We are locally ready (i.e. we won't add new tasks)
577  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
578  ttg::trace("ttg_parsec(", rank, "): waiting for completion");
579  parsec_taskpool_wait(tpool);
580 
581  // We need the synchronization between the end of the context and the restart of the taskpool
582  // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
583  // see Issue #118 (TTG)
584  MPI_Barrier(comm());
585 
586  destroy_tpool();
587  create_tpool();
588  execute();
589  }
590 
591  private:
592  parsec_context_t *ctx = nullptr;
593  bool own_ctx = false; //< whether I own the context
594  parsec_taskpool_t *tpool = nullptr;
595  bool parsec_taskpool_started = false;
596 #if defined(PARSEC_PROF_TRACE)
597  int *profiling_array;
598  std::size_t profiling_array_size;
599 #endif
600  };
601 
602  static void unregister_parsec_tags(void *_pidx)
603  {
604  if(NULL != parsec_ce.tag_unregister) {
605  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
606  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
607  }
608  }
609 
610  namespace detail {
611 
612  const parsec_symbol_t parsec_taskclass_param0 = {
613  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
614  .name = "HASH0",
615  .context_index = 0,
616  .min = nullptr,
617  .max = nullptr,
618  .expr_inc = nullptr,
619  .cst_inc = 0 };
620  const parsec_symbol_t parsec_taskclass_param1 = {
621  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
622  .name = "HASH1",
623  .context_index = 1,
624  .min = nullptr,
625  .max = nullptr,
626  .expr_inc = nullptr,
627  .cst_inc = 0 };
628  const parsec_symbol_t parsec_taskclass_param2 = {
629  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
630  .name = "KEY0",
631  .context_index = 2,
632  .min = nullptr,
633  .max = nullptr,
634  .expr_inc = nullptr,
635  .cst_inc = 0 };
636  const parsec_symbol_t parsec_taskclass_param3 = {
637  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
638  .name = "KEY1",
639  .context_index = 3,
640  .min = nullptr,
641  .max = nullptr,
642  .expr_inc = nullptr,
643  .cst_inc = 0 };
644 
646  ttg_data_copy_t *res = nullptr;
647  if (task == nullptr || ptr == nullptr) {
648  return res;
649  }
650  for (int i = 0; i < task->data_count; ++i) {
651  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
652  if (NULL != copy && copy->get_ptr() == ptr) {
653  res = copy;
654  break;
655  }
656  }
657  return res;
658  }
659 
660  inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
661  int i = -1;
662  if (task == nullptr || ptr == nullptr) {
663  return i;
664  }
665  for (i = 0; i < task->data_count; ++i) {
666  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
667  if (NULL != copy && copy->get_ptr() == ptr) {
668  return i;
669  }
670  }
671  return -1;
672  }
673 
675  if (task == nullptr || copy == nullptr) {
676  return false;
677  }
678 
679  if (MAX_PARAM_COUNT < task->data_count) {
680  throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
681  }
682 
683  task->copies[task->data_count] = copy;
684  task->data_count++;
685  return true;
686  }
687 
689  int i;
690  /* find and remove entry; copies are usually appended and removed, so start from back */
691  for (i = task->data_count-1; i >= 0; --i) {
692  if (copy == task->copies[i]) {
693  break;
694  }
695  }
696  if (i < 0) return;
697  /* move all following elements one up */
698  for (; i < task->data_count - 1; ++i) {
699  task->copies[i] = task->copies[i + 1];
700  }
701  /* null last element */
702  task->copies[i] = nullptr;
703  task->data_count--;
704  }
705 
706 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
707 #warning "ttg::PaRSEC enables data copy tracking"
708  static std::unordered_set<ttg_data_copy_t *> pending_copies;
709  static std::mutex pending_copies_mutex;
710 #endif
711 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
712  static int64_t parsec_ttg_data_copy_uid = 0;
713 #endif
714 
715  template <typename Value>
716  inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
717  using value_type = std::decay_t<Value>;
718  ttg_data_copy_t *copy = nullptr;
719  if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
720  std::is_constructible_v<value_type, decltype(value)>) {
721  copy = new value_type(std::forward<Value>(value));
722  } else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>, decltype(value)>) {
723  copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
724  } else {
725  /* we have no way to create a new copy from this value */
726  throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
727  }
728 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
729  // Keep track of additional memory usage
730  if(ttg::default_execution_context().impl().profiling()) {
731  copy->size = sizeof(Value);
732  copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
733  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
734  static_cast<uint64_t>(copy->uid),
735  PROFILE_OBJECT_ID_NULL, &copy->size,
736  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
737  }
738 #endif
739 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
740  {
741  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
742  auto rc = pending_copies.insert(copy);
743  assert(std::get<1>(rc));
744  }
745 #endif
746  return copy;
747  }
748 
749 #if 0
750  template <std::size_t... IS, typename Key = keyT>
751  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
752  int junk[] = {0, (invoke_pull_terminal<IS>(
753  std::get<IS>(input_terminals), key, task),
754  0)...};
755  junk[0]++;
756  }
757 #endif // 0
758 
759  template<typename T>
760  inline void transfer_ownership_impl(T&& arg, int device) {
761  if constexpr(!std::is_const_v<std::remove_reference_t<T>>) {
762  detail::foreach_parsec_data(arg, [&](parsec_data_t *data){
763  parsec_data_transfer_ownership_to_copy(data, device, PARSEC_FLOW_ACCESS_RW);
764  /* make sure we increment the version since we will modify the data */
765  data->device_copies[0]->version++;
766  });
767  }
768  }
769 
770  template<typename TT, std::size_t... Is>
771  inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
772  /* transfer ownership of each data */
773  int junk[] = {0,
775  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>> *>(
776  me->copies[Is]->get_ptr()), device), 0)...};
777  junk[0]++;
778  }
779 
780  template<typename TT>
781  inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
782  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
783  if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
784  transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
785  }
786  return me->template invoke_op<ttg::ExecutionSpace::Host>();
787  }
788 
789  template<typename TT>
790  inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
791  if constexpr(TT::derived_has_cuda_op()) {
792  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
793  return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
794  } else {
795  std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
796  return PARSEC_HOOK_RETURN_ERROR;
797  }
798  }
799 
800  template<typename TT>
801  inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
802  if constexpr(TT::derived_has_hip_op()) {
803  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
804  return me->template invoke_op<ttg::ExecutionSpace::HIP>();
805  } else {
806  std::cerr << "HIP hook called without having a HIP op!" << std::endl;
807  return PARSEC_HOOK_RETURN_ERROR;
808  }
809  }
810 
811  template<typename TT>
812  inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
813  if constexpr(TT::derived_has_level_zero_op()) {
814  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
815  return me->template invoke_op<ttg::ExecutionSpace::L0>();
816  } else {
817  std::cerr << "L0 hook called without having a L0 op!" << std::endl;
818  return PARSEC_HOOK_RETURN_ERROR;
819  }
820  }
821 
822 
823  template<typename TT>
824  inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
825  if constexpr(TT::derived_has_cuda_op()) {
826  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
827  return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
828  } else {
829  return PARSEC_HOOK_RETURN_NEXT;
830  }
831  }
832 
833  template<typename TT>
834  inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
835  if constexpr(TT::derived_has_hip_op()) {
836  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
837  return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
838  } else {
839  return PARSEC_HOOK_RETURN_NEXT;
840  }
841  }
842 
843  template<typename TT>
844  inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
845  if constexpr(TT::derived_has_level_zero_op()) {
846  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
847  return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
848  } else {
849  return PARSEC_HOOK_RETURN_NEXT;
850  }
851  }
852 
853 
854  template <typename KeyT, typename ActivationCallbackT>
856  std::vector<KeyT> _keylist;
857  std::atomic<int> _outstanding_transfers;
858  ActivationCallbackT _cb;
860 
861  public:
862  rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
863  : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
864 
865  bool complete_transfer(void) {
866  int left = --_outstanding_transfers;
867  if (0 == left) {
868  _cb(std::move(_keylist), _copy);
869  return true;
870  }
871  return false;
872  }
873  };
874 
875  template <typename ActivationT>
876  static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
877  parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
878  void *cb_data) {
879  parsec_ce.mem_unregister(&lreg);
880  ActivationT *activation = static_cast<ActivationT *>(cb_data);
881  if (activation->complete_transfer()) {
882  delete activation;
883  }
884  return PARSEC_SUCCESS;
885  }
886 
887  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
888  int src, void *cb_data) {
889  std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
890  std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
891  (*fn)();
892  delete fn;
893  return PARSEC_SUCCESS;
894  }
895 
896  template <typename FuncT>
897  static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
898  int src, void *cb_data) {
899  std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
900  FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
901  (*fn_ptr)();
902  delete fn_ptr;
903  return PARSEC_SUCCESS;
904  }
905 
906  inline void release_data_copy(ttg_data_copy_t *copy) {
907  if (copy->is_mutable() && nullptr == copy->get_next_task()) {
908  /* current task mutated the data but there are no consumers so prepare
909  * the copy to be freed below */
910  copy->reset_readers();
911  }
912 
913  int32_t readers = copy->num_readers();
914  if (readers > 1) {
915  /* potentially more than one reader, decrement atomically */
916  readers = copy->decrement_readers();
917  } else if (readers == 1) {
918  /* make sure readers drop to zero */
919  readers = copy->decrement_readers<false>();
920  }
921  /* if there was only one reader (the current task) or
922  * a mutable copy and a successor, we release the copy */
923  if (1 == readers || readers == copy->mutable_tag) {
924  std::atomic_thread_fence(std::memory_order_acquire);
925  if (nullptr != copy->get_next_task()) {
926  /* Release the deferred task.
927  * The copy was mutable and will be mutated by the released task,
928  * so simply transfer ownership.
929  */
930  parsec_task_t *next_task = copy->get_next_task();
931  copy->set_next_task(nullptr);
932  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
933  copy->mark_mutable();
934  deferred_op->release_task();
935  } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
936  /* we are the last reference, delete the copy */
937 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
938  {
939  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
940  size_t rc = pending_copies.erase(copy);
941  assert(1 == rc);
942  }
943 #endif
944 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
945  // Keep track of additional memory usage
946  if(ttg::default_execution_context().impl().profiling()) {
947  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
948  static_cast<uint64_t>(copy->uid),
949  PROFILE_OBJECT_ID_NULL, &copy->size,
950  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
951  }
952 #endif
953  delete copy;
954  }
955  }
956  }
957 
958  template <typename Value>
960  ttg_data_copy_t *copy_res = copy_in;
961  bool replace = false;
962  int32_t readers = copy_in->num_readers();
963  assert(readers != 0);
964 
965  /* try hard to defer writers if we cannot make copies
966  * if deferral fails we have to bail out */
967  bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->defer_writer;
968 
969  if (readonly && !copy_in->is_mutable()) {
970  /* simply increment the number of readers */
971  readers = copy_in->increment_readers();
972  }
973 
974  if (readers == copy_in->mutable_tag) {
975  if (copy_res->get_next_task() != nullptr) {
976  if (readonly) {
977  parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
978  if (next_task->defer_writer) {
979  /* there is a writer but it signalled that it wants to wait for readers to complete */
980  return copy_res;
981  }
982  }
983  }
984  /* someone is going to write into this copy -> we need to make a copy */
985  copy_res = NULL;
986  if (readonly) {
987  /* we replace the copy in a deferred task if the copy will be mutated by
988  * the deferred task and we are readonly.
989  * That way, we can share the copy with other readonly tasks and release
990  * the deferred task. */
991  replace = true;
992  }
993  } else if (!readonly) {
994  /* this task will mutate the data
995  * check whether there are other readers already and potentially
996  * defer the release of this task to give following readers a
997  * chance to make a copy of the data before this task mutates it
998  *
999  * Try to replace the readers with a negative value that indicates
1000  * the value is mutable. If that fails we know that there are other
1001  * readers or writers already.
1002  *
1003  * NOTE: this check is not atomic: either there is a single reader
1004  * (current task) or there are others, in which we case won't
1005  * touch it.
1006  */
1007  if (1 == copy_in->num_readers() && !defer_writer) {
1012  assert(nullptr == copy_in->get_next_task());
1013  copy_in->set_next_task(&task->parsec_task);
1014  std::atomic_thread_fence(std::memory_order_release);
1015  copy_in->mark_mutable();
1016  } else {
1017  if (defer_writer && nullptr == copy_in->get_next_task()) {
1018  /* we're the first writer and want to wait for all readers to complete */
1019  copy_res->set_next_task(&task->parsec_task);
1020  task->defer_writer = true;
1021  } else {
1022  /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
1023  copy_res = NULL;
1024  }
1025  }
1026  }
1027 
1028  if (NULL == copy_res) {
1029  // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1030  if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1031  ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1032  if (replace && nullptr != copy_in->get_next_task()) {
1033  /* replace the task that was deferred */
1034  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1035  new_copy->mark_mutable();
1036  /* replace the copy in the deferred task */
1037  for (int i = 0; i < deferred_op->data_count; ++i) {
1038  if (deferred_op->copies[i] == copy_in) {
1039  deferred_op->copies[i] = new_copy;
1040  break;
1041  }
1042  }
1043  copy_in->set_next_task(nullptr);
1044  deferred_op->release_task();
1045  copy_in->reset_readers(); // set the copy back to being read-only
1046  copy_in->increment_readers<false>(); // register as reader
1047  copy_res = copy_in; // return the copy we were passed
1048  } else {
1049  if (!readonly) {
1050  new_copy->mark_mutable();
1051  }
1052  copy_res = new_copy; // return the new copy
1053  }
1054  }
1055  else {
1056  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<Value>).name() + " but the type is not copyable");
1057  }
1058  }
1059  return copy_res;
1060  }
1061 
1062  } // namespace detail
1063 
1064  inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1065  if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1066 
1067  // make sure it's not already initialized
1068  int mpi_initialized;
1069  MPI_Initialized(&mpi_initialized);
1070  if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1071  int provided;
1072  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1073  if (!provided)
1074  throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1075  detail::initialized_mpi() = true;
1076  } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1077  }
1078 
1080  auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1081  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1082  ttg::World world{std::move(world_sptr)};
1083  ttg::detail::set_default_world(std::move(world));
1084 
1085  // query the first device ID
1087  for (int i = 0; i < parsec_nb_devices; ++i) {
1088  bool is_gpu = parsec_mca_device_is_gpu(i);
1089  if (detail::first_device_id == -1 && is_gpu) {
1091  } else if (detail::first_device_id > -1 && !is_gpu) {
1092  throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1093  }
1094  }
1095 
1096  /* parse the maximum inline size */
1097  const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1098  if (nullptr != ttg_max_inline_cstr) {
1099  std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1100  if (inline_size < detail::max_inline_size) {
1101  detail::max_inline_size = inline_size;
1102  }
1103  }
1104 
1105  bool all_peer_access = true;
1106  /* check whether all GPUs can access all peer GPUs */
1107  for (int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1108  parsec_device_module_t *idevice = parsec_mca_device_get(i);
1109  if (PARSEC_DEV_IS_GPU(idevice->type)) {
1110  parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1111  for (int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1112  if (i != j) { // because no device can access itself, says PaRSEC
1113  parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1114  if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1115  all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true : false;
1116  }
1117  }
1118  }
1119  }
1120  }
1121  detail::all_devices_peer_access = all_peer_access;
1122  }
1123  inline void ttg_finalize() {
1124  // We need to notify the current taskpool of termination if we are in user termination detection mode
1125  // or the parsec_context_wait() in destroy_worlds() will never complete
1127  ttg::default_execution_context().impl().final_task();
1128  ttg::detail::set_default_world(ttg::World{}); // reset the default world
1130  ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1131  if (detail::initialized_mpi()) MPI_Finalize();
1132  }
1134  [[noreturn]]
1135  inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1136  inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1137  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1138 
1139  template <typename T>
1140  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1141  world.impl().register_ptr(ptr);
1142  }
1143 
1144  template <typename T>
1145  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1146  world.impl().register_ptr(std::move(ptr));
1147  }
1148 
1149  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1150  world.impl().register_status(status_ptr);
1151  }
1152 
1153  template <typename Callback>
1154  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1155  world.impl().register_callback(std::forward<Callback>(callback));
1156  }
1157 
1158  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1159 
1160  inline void ttg_sum(ttg::World world, double &value) {
1161  double result = 0.0;
1162  MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1163  value = result;
1164  }
1165 
1166  inline void make_executable_hook(ttg::World& world) {
1167  MPI_Barrier(world.impl().comm());
1168  }
1169 
1172  template <typename T>
1173  void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1174  int64_t BUFLEN;
1175  if (world.rank() == source_rank) {
1177  }
1178  MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1179 
1180  unsigned char *buf = new unsigned char[BUFLEN];
1181  if (world.rank() == source_rank) {
1183  }
1184  MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1185  if (world.rank() != source_rank) {
1187  }
1188  delete[] buf;
1189  }
1190 
1191  namespace detail {
1192 
1193  struct ParsecTTBase {
1194  protected:
1195  // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1196  parsec_hash_table_t tasks_table;
1197  parsec_hash_table_t task_constraint_table;
1198  parsec_task_class_t self;
1199  };
1200 
1201  } // namespace detail
1202 
1203  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
1205  private:
1207  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1208  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1209  // create a virtual control input if the input list is empty, to be used in invoke()
1210  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1212  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1213  static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1214  "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1215  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1216  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1217 
1218  parsec_mempool_t mempools;
1219 
1220  // check for a non-type member named have_cuda_op
1221  template <typename T>
1222  using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1223 
1224  template <typename T>
1225  using have_hip_op_non_type_t = decltype(T::have_hip_op);
1226 
1227  template <typename T>
1228  using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1229 
1230  bool alive = true;
1231 
1232  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1233  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1234  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1235  static constexpr int numflows = std::max(numins, numouts); // max number of flows
1236 
1237  public:
1239  template<typename DerivedT = derivedT>
1240  static constexpr bool derived_has_cuda_op() {
1241  return Space == ttg::ExecutionSpace::CUDA;
1242  }
1243 
1245  template<typename DerivedT = derivedT>
1246  static constexpr bool derived_has_hip_op() {
1247  return Space == ttg::ExecutionSpace::HIP;
1248  }
1249 
1251  template<typename DerivedT = derivedT>
1252  static constexpr bool derived_has_level_zero_op() {
1253  return Space == ttg::ExecutionSpace::L0;
1254  }
1255 
1257  template<typename DerivedT = derivedT>
1258  static constexpr bool derived_has_device_op() {
1259  return (derived_has_cuda_op<DerivedT>() ||
1260  derived_has_hip_op<DerivedT>() ||
1261  derived_has_level_zero_op<DerivedT>());
1262  }
1263 
1266  "Data sent from a device-capable template task must be serializable.");
1267 
1268  using ttT = TT;
1269  using key_type = keyT;
1271  using input_args_type = actual_input_tuple_type;
1273  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1275  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1277  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1278  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
1279  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
1280 
1281  static constexpr int numinvals =
1282  std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1283  // input, if any)
1284 
1285  using output_terminals_type = output_terminalsT;
1287 
1288  template <std::size_t i, typename resultT, typename InTuple>
1289  static resultT get(InTuple &&intuple) {
1290  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1291  };
1292  template <std::size_t i, typename InTuple>
1293  static auto &get(InTuple &&intuple) {
1294  return std::get<i>(std::forward<InTuple>(intuple));
1295  };
1296 
1297  private:
1298  using task_t = detail::parsec_ttg_task_t<ttT>;
1299 
1301  friend task_t;
1302 
1303  /* the offset of the key placed after the task structure in the memory from mempool */
1304  constexpr static const size_t task_key_offset = sizeof(task_t);
1305 
1306  input_terminals_type input_terminals;
1307  output_terminalsT output_terminals;
1308 
1309  protected:
1310  const auto &get_output_terminals() const { return output_terminals; }
1311 
1312  private:
1313  template <std::size_t... IS>
1314  static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1315  using resultT = decltype(set_arg_from_msg_fcts);
1316  return resultT{{&TT::set_arg_from_msg<IS>...}};
1317  }
1318  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1319  make_set_args_fcts(std::make_index_sequence<numins>{});
1320 
1321  template <std::size_t... IS>
1322  static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1323  using resultT = decltype(set_argstream_size_from_msg_fcts);
1324  return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1325  }
1326  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1327  make_set_size_fcts(std::make_index_sequence<numins>{});
1328 
1329  template <std::size_t... IS>
1330  static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1331  using resultT = decltype(finalize_argstream_from_msg_fcts);
1332  return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1333  }
1334  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1335  make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1336 
1337  template <std::size_t... IS>
1338  static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1339  using resultT = decltype(get_from_pull_msg_fcts);
1340  return resultT{{&TT::get_from_pull_msg<IS>...}};
1341  }
1342  constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1343  make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1344 
1345  template<std::size_t... IS>
1346  constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1347  using resultT = decltype(input_is_const);
1348  return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1349  }
1350  constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1351 
1352  ttg::World world;
1353  ttg::meta::detail::keymap_t<keyT> keymap;
1354  ttg::meta::detail::keymap_t<keyT> priomap;
1355  ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1356  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1357  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1358  input_reducers;
1359  std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1360  std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1361  int num_pullins = 0;
1362 
1363  bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1364 
1365  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1366  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1367 
1368  public:
1369  ttg::World get_world() const override final { return world; }
1370 
1371  private:
1375  template <typename... Args>
1376  auto op(Args &&...args) {
1377  derivedT *derived = static_cast<derivedT *>(this);
1378  using return_type = decltype(derived->op(std::forward<Args>(args)...));
1379  if constexpr (std::is_same_v<return_type,void>) {
1380  derived->op(std::forward<Args>(args)...);
1381  return;
1382  }
1383  else {
1384  return derived->op(std::forward<Args>(args)...);
1385  }
1386  }
1387 
1388  template <std::size_t i, typename terminalT, typename Key>
1389  void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1390  if (in.is_pull_terminal) {
1391  auto owner = in.container.owner(key);
1392  if (owner != world.rank()) {
1393  get_pull_terminal_data_from<i>(owner, key);
1394  } else {
1395  // push the data to the task
1396  set_arg<i>(key, (in.container).get(key));
1397  }
1398  }
1399  }
1400 
1401  template <std::size_t i, typename Key>
1402  void get_pull_terminal_data_from(const int owner,
1403  const Key &key) {
1404  using msg_t = detail::msg_t;
1405  auto &world_impl = world.impl();
1406  parsec_taskpool_t *tp = world_impl.taskpool();
1407  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1409  world.rank(), 1);
1410  /* pack the key */
1411  size_t pos = 0;
1412  pos = pack(key, msg->bytes, pos);
1413  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1414  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1415  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1416  sizeof(msg_header_t) + pos);
1417  }
1418 
1419  template <std::size_t... IS, typename Key = keyT>
1420  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1421  int junk[] = {0, (invoke_pull_terminal<IS>(
1422  std::get<IS>(input_terminals), key, task),
1423  0)...};
1424  junk[0]++;
1425  }
1426 
1427  template <std::size_t... IS>
1428  static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1429  return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1430  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1431  task->copies[IS]->get_ptr()))...};
1432  }
1433 
1434 #ifdef TTG_HAVE_DEVICE
1438  static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1439  parsec_gpu_task_t *gpu_task,
1440  parsec_gpu_exec_stream_t *gpu_stream) {
1441 
1442  task_t *task = (task_t*)gpu_task->ec;
1443  // get the device task from the coroutine handle
1444  ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1445 
1446  task->dev_ptr->stream = gpu_stream;
1447 
1448  //std::cout << "device_static_submit task " << task << std::endl;
1449 
1450  // get the promise which contains the views
1451  auto dev_data = dev_task.promise();
1452 
1453  /* we should still be waiting for the transfer to complete */
1454  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1455  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1456 
1457 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1458  {
1459  parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1460  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1461  ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1462  }
1463 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1464  {
1465  parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1466  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1467  ttg::device::detail::set_current(device, hip_stream->hip_stream);
1468  }
1469 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1470  {
1471  parsec_level_zero_exec_stream_t *stream;
1472  stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1473  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1474  ttg::device::detail::set_current(device, stream->swq->queue);
1475  }
1476 #endif // defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1477 
1478  /* Here we call back into the coroutine again after the transfers have completed */
1479  static_op(&task->parsec_task);
1480 
1482 
1483  auto discard_tmp_flows = [&](){
1484  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1485  if (gpu_task->flow[i]->flow_flags & TTG_PARSEC_FLOW_ACCESS_TMP) {
1486  /* temporary flow, discard by setting it to read-only to avoid evictions */
1487  const_cast<parsec_flow_t*>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1488  task->parsec_task.data[i].data_out->readers = 1;
1489  }
1490  }
1491  };
1492 
1493  /* we will come back into this function once the kernel and transfers are done */
1494  int rc = PARSEC_HOOK_RETURN_DONE;
1495  if (nullptr != task->suspended_task_address) {
1496  /* Get a new handle for the promise*/
1497  dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1498  dev_data = dev_task.promise();
1499 
1500  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1501  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1502  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1503 
1504  if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1505  ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1506  /* the task started sending so we won't come back here */
1507  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1508  discard_tmp_flows();
1509  } else {
1510  //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1511  rc = PARSEC_HOOK_RETURN_AGAIN;
1512  }
1513  } else {
1514  /* the task is done so we won't come back here */
1515  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1516  discard_tmp_flows();
1517  }
1518  return rc;
1519  }
1520 
1521  static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1522 
1523  task_t *task = (task_t*)parsec_task;
1524  if (task->dev_ptr->gpu_task == nullptr) {
1525 
1526  /* set up a device task */
1527  parsec_gpu_task_t *gpu_task;
1528  /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1529  gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1530  PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1531  gpu_task->ec = parsec_task;
1532  gpu_task->task_type = 0; // user task
1533  gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max(); // used internally
1534  gpu_task->pushout = 0;
1535  gpu_task->submit = &TT::device_static_submit;
1536 
1537  // one way to force the task device
1538  // currently this will probably break all of PaRSEC if this hint
1539  // does not match where the data is located, not really useful for us
1540  // instead we set a hint on the data if there is no hint set yet
1541  //parsec_task->selected_device = ...;
1542 
1543  /* set the gpu_task so it's available in register_device_memory */
1544  task->dev_ptr->gpu_task = gpu_task;
1545 
1546  /* TODO: is this the right place to set the mask? */
1547  task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1548 
1549  /* copy over the task class, because that's what we need */
1550  task->dev_ptr->task_class = *task->parsec_task.task_class;
1551 
1552  // first invocation of the coroutine to get the coroutine handle
1553  static_op(parsec_task);
1554 
1555  /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1556 
1557  parsec_task_class_t& tc = task->dev_ptr->task_class;
1558 
1559  // input flows are set up during register_device_memory as part of the first invocation above
1560  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1561  tc.in[i] = gpu_task->flow[i];
1562  tc.out[i] = gpu_task->flow[i];
1563  }
1564  tc.nb_flows = MAX_PARAM_COUNT;
1565 
1566  /* set the device hint on the data */
1567  TT *tt = task->tt;
1568  if (tt->devicemap) {
1569  int parsec_dev;
1570  if constexpr (std::is_void_v<keyT>) {
1571  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap());
1572  } else {
1573  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap(task->key));
1574  }
1575  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1576  /* only set on mutable data since we have exclusive access */
1577  if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1578  parsec_data_t *data = parsec_task->data[i].data_in->original;
1579  /* only set the preferred device if the host has the latest copy
1580  * as otherwise we may end up with the wrong data if there is a newer
1581  * version on a different device. Also, keep fingers crossed. */
1582  if (data->owner_device == 0) {
1583  parsec_advise_data_on_device(data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1584  }
1585  }
1586  }
1587  }
1588 
1589  /* set the new task class that contains the flows */
1590  task->parsec_task.task_class = &task->dev_ptr->task_class;
1591 
1592  /* select this one */
1593  return PARSEC_HOOK_RETURN_DONE;
1594  }
1595 
1596  std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1597 
1598  /* not sure if this might happen*/
1599  return PARSEC_HOOK_RETURN_ERROR;
1600 
1601  }
1602 
1603  static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1604  static_assert(derived_has_device_op());
1605 
1606  /* when we come in here we have a device assigned and are ready to go */
1607 
1608  task_t *task = (task_t*)parsec_task;
1609 
1610  if (nullptr == task->suspended_task_address) {
1611  /* short-cut in case the task returned immediately */
1612  return PARSEC_HOOK_RETURN_DONE;
1613  }
1614 
1615  // get the device task from the coroutine handle
1616  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1617 
1618  // get the promise which contains the views
1619  ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1620 
1621  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1622  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1623  /* the task jumped directly to send or returned so we're done */
1624  return PARSEC_HOOK_RETURN_DONE;
1625  }
1626 
1627  parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1628  assert(NULL != device);
1629 
1630  task->dev_ptr->device = device;
1631  parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1632  parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1633 
1634  switch(device->super.type) {
1635 
1636 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1637  case PARSEC_DEV_CUDA:
1638  if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1639  /* TODO: we need custom staging functions because PaRSEC looks at the
1640  * task-class to determine the number of flows. */
1641  gpu_task->stage_in = parsec_default_gpu_stage_in;
1642  gpu_task->stage_out = parsec_default_gpu_stage_out;
1643  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1644  }
1645  break;
1646 #endif
1647 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1648  case PARSEC_DEV_HIP:
1649  if constexpr (Space == ttg::ExecutionSpace::HIP) {
1650  gpu_task->stage_in = parsec_default_gpu_stage_in;
1651  gpu_task->stage_out = parsec_default_gpu_stage_out;
1652  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1653  }
1654  break;
1655 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1656 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1657  case PARSEC_DEV_LEVEL_ZERO:
1658  if constexpr (Space == ttg::ExecutionSpace::L0) {
1659  gpu_task->stage_in = parsec_default_gpu_stage_in;
1660  gpu_task->stage_out = parsec_default_gpu_stage_out;
1661  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1662  }
1663  break;
1664 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1665  default:
1666  break;
1667  }
1668  ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1669  ttg::abort();
1670  return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1671  }
1672 #endif // TTG_HAVE_DEVICE
1673 
1674  static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1675 
1676  task_t *task = (task_t*)parsec_task;
1677  void* suspended_task_address =
1678 #ifdef TTG_HAVE_COROUTINE
1679  task->suspended_task_address; // non-null = need to resume the task
1680 #else // TTG_HAVE_COROUTINE
1681  nullptr;
1682 #endif // TTG_HAVE_COROUTINE
1683  //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1684  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1685 
1686  ttT *baseobj = task->tt;
1687  derivedT *obj = static_cast<derivedT *>(baseobj);
1688  assert(detail::parsec_ttg_caller == nullptr);
1689  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1690  if (obj->tracing()) {
1691  if constexpr (!ttg::meta::is_void_v<keyT>)
1692  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1693  else
1694  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1695  }
1696 
1697  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1698  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1699  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1700  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1701  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1702  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1703  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1704  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1705  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1706  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1707  } else {
1708  ttg::abort();
1709  }
1710  detail::parsec_ttg_caller = nullptr;
1711  }
1712  else { // resume the suspended coroutine
1713 
1714 #ifdef TTG_HAVE_COROUTINE
1715  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1716 
1717 #ifdef TTG_HAVE_DEVICE
1718  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1719  ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1720  assert(detail::parsec_ttg_caller == nullptr);
1721  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1722  // TODO: unify the outputs tls handling
1723  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1724  task->tt->set_outputs_tls_ptr();
1725  coro.resume();
1726  if (coro.completed()) {
1727  coro.destroy();
1728  suspended_task_address = nullptr;
1729  }
1730  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1731  detail::parsec_ttg_caller = nullptr;
1732  } else
1733 #endif // TTG_HAVE_DEVICE
1734  if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1735  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1736  assert(ret.ready());
1737  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1738  task->tt->set_outputs_tls_ptr();
1739  ret.resume();
1740  if (ret.completed()) {
1741  ret.destroy();
1742  suspended_task_address = nullptr;
1743  }
1744  else { // not yet completed
1745  // leave suspended_task_address as is
1746 
1747  // right now can events are not properly implemented, we are only testing the workflow with dummy events
1748  // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1749  auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1750  for (auto &event_ptr : events) {
1751  event_ptr->finish();
1752  }
1753  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
1754  }
1755  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1756  detail::parsec_ttg_caller = nullptr;
1757  task->suspended_task_address = suspended_task_address;
1758  }
1759  else
1760  ttg::abort(); // unrecognized task id
1761 #else // TTG_HAVE_COROUTINE
1762  ttg::abort(); // should not happen
1763 #endif // TTG_HAVE_COROUTINE
1764  }
1765 #ifdef TTG_HAVE_COROUTINE
1766  task->suspended_task_address = suspended_task_address;
1767 #endif // TTG_HAVE_COROUTINE
1768  if (suspended_task_address == nullptr) {
1769  ttT *baseobj = task->tt;
1770  derivedT *obj = static_cast<derivedT *>(baseobj);
1771  if (obj->tracing()) {
1772  if constexpr (!ttg::meta::is_void_v<keyT>)
1773  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1774  else
1775  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1776  }
1777  }
1778 
1779  return PARSEC_HOOK_RETURN_DONE;
1780  }
1781 
1782  static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1783  task_t *task = static_cast<task_t*>(parsec_task);
1784 
1785  void* suspended_task_address =
1786 #ifdef TTG_HAVE_COROUTINE
1787  task->suspended_task_address; // non-null = need to resume the task
1788 #else // TTG_HAVE_COROUTINE
1789  nullptr;
1790 #endif // TTG_HAVE_COROUTINE
1791  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1792  ttT *baseobj = (ttT *)task->object_ptr;
1793  derivedT *obj = (derivedT *)task->object_ptr;
1794  assert(detail::parsec_ttg_caller == NULL);
1796  if constexpr (!ttg::meta::is_void_v<keyT>) {
1797  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1798  } else if constexpr (ttg::meta::is_void_v<keyT>) {
1799  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1800  } else // unreachable
1801  ttg:: abort();
1803  }
1804  else {
1805 #ifdef TTG_HAVE_COROUTINE
1806  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1807  assert(ret.ready());
1808  ret.resume();
1809  if (ret.completed()) {
1810  ret.destroy();
1811  suspended_task_address = nullptr;
1812  }
1813  else { // not yet completed
1814  // leave suspended_task_address as is
1815  }
1816 #else // TTG_HAVE_COROUTINE
1817  ttg::abort(); // should not happen
1818 #endif // TTG_HAVE_COROUTINE
1819  }
1820  task->suspended_task_address = suspended_task_address;
1821 
1822  if (suspended_task_address) {
1823  ttg::abort(); // not yet implemented
1824  // see comments in static_op()
1825  return PARSEC_HOOK_RETURN_AGAIN;
1826  }
1827  else
1828  return PARSEC_HOOK_RETURN_DONE;
1829  }
1830 
1831  template <std::size_t i>
1832  static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1833  using rtask_t = detail::reducer_task_t;
1834  using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1835  constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1836  constexpr const bool input_is_const = std::is_const_v<value_t>;
1837  rtask_t *rtask = (rtask_t*)parsec_task;
1838  task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1839  ttT *baseobj = parent_task->tt;
1840  derivedT *obj = static_cast<derivedT *>(baseobj);
1841 
1842  auto& reducer = std::get<i>(baseobj->input_reducers);
1843 
1844  //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1845 
1846  if (obj->tracing()) {
1847  if constexpr (!ttg::meta::is_void_v<keyT>)
1848  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1849  else
1850  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1851  }
1852 
1853  /* the copy to reduce into */
1854  detail::ttg_data_copy_t *target_copy;
1855  target_copy = parent_task->copies[i];
1856  assert(val_is_void || nullptr != target_copy);
1857  /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1858  std::size_t c = 0;
1859  std::size_t size = 0;
1860  assert(parent_task->streams[i].reduce_count > 0);
1861  if (rtask->is_first) {
1862  if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1863  /* we were the first and there is nothing to be done */
1864  if (obj->tracing()) {
1865  if constexpr (!ttg::meta::is_void_v<keyT>)
1866  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1867  else
1868  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1869  }
1870 
1871  return PARSEC_HOOK_RETURN_DONE;
1872  }
1873  }
1874 
1875  assert(detail::parsec_ttg_caller == NULL);
1876  detail::parsec_ttg_caller = rtask->parent_task;
1877 
1878  do {
1879  if constexpr(!val_is_void) {
1880  /* the copies to reduce out of */
1881  detail::ttg_data_copy_t *source_copy;
1882  parsec_list_item_t *item;
1883  item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1884  if (nullptr == item) {
1885  // maybe someone is changing the goal right now
1886  break;
1887  }
1888  source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1889  assert(target_copy->num_readers() == target_copy->mutable_tag);
1890  assert(source_copy->num_readers() > 0);
1891  reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1892  *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1893  detail::release_data_copy(source_copy);
1894  } else if constexpr(val_is_void) {
1895  reducer(); // invoke control reducer
1896  }
1897  // there is only one task working on this stream, so no need to be atomic here
1898  size = ++parent_task->streams[i].size;
1899  //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1900  } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1901  //} while ((c = (--task->streams[i].reduce_count)) > 0);
1902 
1903  /* finalize_argstream sets goal to 1, so size may be larger than goal */
1904  bool complete = (size >= parent_task->streams[i].goal);
1905 
1906  //std::cout << "static_reducer_op size " << size
1907  // << " of " << parent_task->streams[i].goal << " complete " << complete
1908  // << " c " << c << std::endl;
1909  if (complete && c == 0) {
1910  if constexpr(input_is_const) {
1911  /* make the consumer task a reader if its input is const */
1912  target_copy->reset_readers();
1913  }
1914  /* task may not be runnable yet because other inputs are missing, have release_task decide */
1915  parent_task->remove_from_hash = true;
1916  parent_task->release_task(parent_task);
1917  }
1918 
1920 
1921  if (obj->tracing()) {
1922  if constexpr (!ttg::meta::is_void_v<keyT>)
1923  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1924  else
1925  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1926  }
1927 
1928  return PARSEC_HOOK_RETURN_DONE;
1929  }
1930 
1931 
1932  protected:
1933  template <typename T>
1934  uint64_t unpack(T &obj, void *_bytes, uint64_t pos) {
1936  uint64_t payload_size;
1937  if constexpr (!dd_t::serialize_size_is_const) {
1938  pos = ttg::default_data_descriptor<uint64_t>::unpack_payload(&payload_size, sizeof(uint64_t), pos, _bytes);
1939  } else {
1940  payload_size = dd_t::payload_size(&obj);
1941  }
1942  pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1943  return pos;
1944  }
1945 
1946  template <typename T>
1947  uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1949  uint64_t payload_size = dd_t::payload_size(&obj);
1950  if constexpr (!dd_t::serialize_size_is_const) {
1951  pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1952  }
1953  pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1954  return pos;
1955  }
1956 
1957  static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1958  assert(size >= sizeof(msg_header_t) &&
1959  "Trying to unpack as message that does not hold enough bytes to represent a single header");
1960  msg_header_t *hd = static_cast<msg_header_t *>(data);
1961  derivedT *obj = reinterpret_cast<derivedT *>(bop);
1962  switch (hd->fn_id) {
1964  if (0 <= hd->param_id) {
1965  assert(hd->param_id >= 0);
1966  assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1967  auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1968  (obj->*member)(data, size);
1969  } else {
1970  // there is no good reason to have negative param ids
1971  ttg::abort();
1972  }
1973  break;
1974  }
1976  assert(hd->param_id >= 0);
1977  assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
1978  auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
1979  (obj->*member)(data, size);
1980  break;
1981  }
1983  assert(hd->param_id >= 0);
1984  assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
1985  auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
1986  (obj->*member)(data, size);
1987  break;
1988  }
1990  assert(hd->param_id >= 0);
1991  assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
1992  auto member = obj->get_from_pull_msg_fcts[hd->param_id];
1993  (obj->*member)(data, size);
1994  break;
1995  }
1996  default:
1997  ttg::abort();
1998  }
1999  }
2000 
2002  inline parsec_thread_mempool_t *get_task_mempool(void) {
2003  auto &world_impl = world.impl();
2004  parsec_execution_stream_s *es = world_impl.execution_stream();
2005  int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
2006  return &mempools.thread_mempools[index];
2007  }
2008 
2009  template <size_t i, typename valueT>
2010  void set_arg_from_msg_keylist(ttg::span<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2011  /* create a dummy task that holds the copy, which can be reused by others */
2012  task_t *dummy;
2013  parsec_execution_stream_s *es = world.impl().execution_stream();
2014  parsec_thread_mempool_t *mempool = get_task_mempool();
2015  dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self, this);
2016  dummy->set_dummy(true);
2017  // TODO: do we need to copy static_stream_goal in dummy?
2018 
2019  /* set the received value as the dummy's only data */
2020  dummy->copies[0] = copy;
2021 
2022  /* We received the task on this world, so it's using the same taskpool */
2023  dummy->parsec_task.taskpool = world.impl().taskpool();
2024 
2025  /* save the current task and set the dummy task */
2026  auto parsec_ttg_caller_save = detail::parsec_ttg_caller;
2027  detail::parsec_ttg_caller = dummy;
2028 
2029  /* iterate over the keys and have them use the copy we made */
2030  parsec_task_t *task_ring = nullptr;
2031  for (auto &&key : keylist) {
2032  // copy-constructible? can broadcast to any number of keys
2033  if constexpr (std::is_copy_constructible_v<valueT>) {
2034  set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2035  }
2036  else {
2037  // not copy-constructible? can move, but only to single key
2038  static_assert(!std::is_reference_v<valueT>);
2039  if (std::size(keylist) == 1)
2040  set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2041  else {
2042  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<valueT>).name() + " but the type is not copyable");
2043  }
2044  }
2045  }
2046 
2047  if (nullptr != task_ring) {
2048  auto &world_impl = world.impl();
2049  parsec_task_t *vp_task_ring[1] = { task_ring };
2050  __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2051  }
2052 
2053  /* restore the previous task */
2054  detail::parsec_ttg_caller = parsec_ttg_caller_save;
2055 
2056  /* release the dummy task */
2057  complete_task_and_release(es, &dummy->parsec_task);
2058  parsec_thread_mempool_free(mempool, &dummy->parsec_task);
2059  }
2060 
2061  // there are 6 types of set_arg:
2062  // - case 1: nonvoid Key, complete Value type
2063  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2064  // - case 3: nonvoid Key, void Value, no inputs
2065  // - case 4: void Key, complete Value type
2066  // - case 5: void Key, void Value, mixed (data+control) inputs
2067  // - case 6: void Key, void Value, no inputs
2068  // implementation of these will be further split into "local-only" and global+local
2069 
2070  template <std::size_t i>
2071  void set_arg_from_msg(void *data, std::size_t size) {
2072  using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2073  using msg_t = detail::msg_t;
2074  msg_t *msg = static_cast<msg_t *>(data);
2075  if constexpr (!ttg::meta::is_void_v<keyT>) {
2076  /* unpack the keys */
2077  /* TODO: can we avoid copying all the keys?! */
2078  uint64_t pos = msg->tt_id.key_offset;
2079  uint64_t key_end_pos;
2080  std::vector<keyT> keylist;
2081  int num_keys = msg->tt_id.num_keys;
2082  keylist.reserve(num_keys);
2083  auto rank = world.rank();
2084  for (int k = 0; k < num_keys; ++k) {
2085  keyT key;
2086  pos = unpack(key, msg->bytes, pos);
2087  assert(keymap(key) == rank);
2088  keylist.push_back(std::move(key));
2089  }
2090  key_end_pos = pos;
2091  /* jump back to the beginning of the message to get the value */
2092  pos = 0;
2093  // case 1
2094  if constexpr (!ttg::meta::is_void_v<valueT>) {
2095  using decvalueT = std::decay_t<valueT>;
2096  int32_t num_iovecs = msg->tt_id.num_iovecs;
2097  //bool inline_data = msg->inline_data;
2101  using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2102 
2103  /* unpack the metadata */
2104  metadata_t metadata;
2105  pos = unpack(metadata, msg->bytes, pos);
2106 
2107  //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2108 
2109  copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2110  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2111  copy = detail::create_new_datacopy(decvalueT{});
2112 #if 0
2113  // TODO: first attempt at sending directly to the device
2114  parsec_gpu_data_copy_t* gpu_elem;
2115  gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2116  int i = detail::first_device_id;
2117  int devid = detail::first_device_id;
2118  while (i < parsec_nb_devices) {
2119  if (nullptr == gpu_elem) {
2120  gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2121  gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2122  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2123  gpu_elem->version = 0;
2124  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2125  }
2126  if (nullptr == gpu_elem->device_private) {
2127  gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2128  if (nullptr == gpu_elem->device_private) {
2129  devid++;
2130  continue;
2131  }
2132  break;
2133  }
2134  }
2135 #endif // 0
2136  /* unpack the object, potentially discovering iovecs */
2137  pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2138  }
2139 
2140  if (num_iovecs == 0) {
2141  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2142  } else {
2143  /* unpack the header and start the RMA transfers */
2144 
2145  /* get the remote rank */
2146  int remote = msg->tt_id.sender;
2147  assert(remote < world.size());
2148 
2149  auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2150 
2151  bool inline_data = msg->tt_id.inline_data;
2152 
2153  int nv = 0;
2154  parsec_ce_tag_t cbtag;
2155  /* start the RMA transfers */
2156  auto create_activation_fn = [&]() {
2157  /* extract the callback tag */
2158  std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2159  pos += sizeof(cbtag);
2160 
2161  /* create the value from the metadata */
2162  auto activation = new detail::rma_delayed_activate(
2163  std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2164  set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2165  this->world.impl().decrement_inflight_msg();
2166  });
2167  return activation;
2168  };
2169  auto read_inline_data = [&](auto&& iovec){
2170  /* unpack the data from the message */
2171  ++nv;
2172  std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes);
2173  pos += iovec.num_bytes;
2174  };
2175  auto handle_iovec_fn = [&](auto&& iovec, auto activation) {
2176  using ActivationT = std::decay_t<decltype(*activation)>;
2177 
2178  ++nv;
2179  parsec_ce_mem_reg_handle_t rreg;
2180  int32_t rreg_size_i;
2181  std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2182  pos += sizeof(rreg_size_i);
2183  rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2184  pos += rreg_size_i;
2185  // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2186  // pos += sizeof(*fn_ptr);
2187  std::intptr_t fn_ptr;
2188  std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2189  pos += sizeof(fn_ptr);
2190 
2191  /* register the local memory */
2192  parsec_ce_mem_reg_handle_t lreg;
2193  size_t lreg_size;
2194  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2195  iovec.num_bytes, &lreg, &lreg_size);
2196  world.impl().increment_inflight_msg();
2197  /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2198  //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2199  parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote,
2200  &detail::get_complete_cb<ActivationT>, activation,
2201  /*world.impl().parsec_ttg_rma_tag()*/
2202  cbtag, &fn_ptr, sizeof(std::intptr_t));
2203  };
2206  if (inline_data) {
2207  for (auto&& iov : descr.get_data(val)) {
2208  read_inline_data(iov);
2209  }
2210  } else {
2211  auto activation = create_activation_fn();
2212  for (auto&& iov : descr.get_data(val)) {
2213  handle_iovec_fn(iov, activation);
2214  }
2215  }
2216  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2217  if (inline_data) {
2218  detail::foreach_parsec_data(val, [&](parsec_data_t* data){
2219  read_inline_data(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private});
2220  });
2221  } else {
2222  auto activation = create_activation_fn();
2223  detail::foreach_parsec_data(val, [&](parsec_data_t* data){
2224  handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}, activation);
2225  });
2226  }
2227  }
2228 
2229  assert(num_iovecs == nv);
2230  assert(size == (key_end_pos + sizeof(msg_header_t)));
2231 
2232  if (inline_data) {
2233  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2234  }
2235  }
2236  // case 2 and 3
2237  } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2238  for (auto &&key : keylist) {
2239  set_arg<i, keyT, ttg::Void>(key, ttg::Void{});
2240  }
2241  }
2242  // case 4
2243  } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2244  using decvalueT = std::decay_t<valueT>;
2245  decvalueT val;
2246  /* TODO: handle split-metadata case as with non-void keys */
2247  unpack(val, msg->bytes, 0);
2248  set_arg<i, keyT, valueT>(std::move(val));
2249  // case 5 and 6
2250  } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2251  set_arg<i, keyT, ttg::Void>(ttg::Void{});
2252  } else { // unreachable
2253  ttg::abort();
2254  }
2255  }
2256 
2257  template <std::size_t i>
2258  void finalize_argstream_from_msg(void *data, std::size_t size) {
2259  using msg_t = detail::msg_t;
2260  msg_t *msg = static_cast<msg_t *>(data);
2261  if constexpr (!ttg::meta::is_void_v<keyT>) {
2262  /* unpack the key */
2263  uint64_t pos = 0;
2264  auto rank = world.rank();
2265  keyT key;
2266  pos = unpack(key, msg->bytes, pos);
2267  assert(keymap(key) == rank);
2268  finalize_argstream<i>(key);
2269  } else {
2270  auto rank = world.rank();
2271  assert(keymap() == rank);
2272  finalize_argstream<i>();
2273  }
2274  }
2275 
2276  template <std::size_t i>
2277  void argstream_set_size_from_msg(void *data, std::size_t size) {
2278  using msg_t = detail::msg_t;
2279  auto msg = static_cast<msg_t *>(data);
2280  uint64_t pos = 0;
2281  if constexpr (!ttg::meta::is_void_v<keyT>) {
2282  /* unpack the key */
2283  auto rank = world.rank();
2284  keyT key;
2285  pos = unpack(key, msg->bytes, pos);
2286  assert(keymap(key) == rank);
2287  std::size_t argstream_size;
2288  pos = unpack(argstream_size, msg->bytes, pos);
2289  set_argstream_size<i>(key, argstream_size);
2290  } else {
2291  auto rank = world.rank();
2292  assert(keymap() == rank);
2293  std::size_t argstream_size;
2294  pos = unpack(argstream_size, msg->bytes, pos);
2295  set_argstream_size<i>(argstream_size);
2296  }
2297  }
2298 
2299  template <std::size_t i>
2300  void get_from_pull_msg(void *data, std::size_t size) {
2301  using msg_t = detail::msg_t;
2302  msg_t *msg = static_cast<msg_t *>(data);
2303  auto &in = std::get<i>(input_terminals);
2304  if constexpr (!ttg::meta::is_void_v<keyT>) {
2305  /* unpack the key */
2306  uint64_t pos = 0;
2307  keyT key;
2308  pos = unpack(key, msg->bytes, pos);
2309  set_arg<i>(key, (in.container).get(key));
2310  }
2311  }
2312 
2313  template <std::size_t i, typename Key, typename Value>
2314  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2315  const Key &key, Value &&value) {
2316  set_arg_local_impl<i>(key, std::forward<Value>(value));
2317  }
2318 
2319  template <std::size_t i, typename Key = keyT, typename Value>
2320  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2321  Value &&value) {
2322  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2323  }
2324 
2325  template <std::size_t i, typename Key, typename Value>
2326  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2327  const Key &key, const Value &value) {
2328  set_arg_local_impl<i>(key, value);
2329  }
2330 
2331  template <std::size_t i, typename Key = keyT, typename Value>
2332  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2333  const Value &value) {
2334  set_arg_local_impl<i>(ttg::Void{}, value);
2335  }
2336 
2337  template <std::size_t i, typename Key = keyT, typename Value>
2338  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2339  std::shared_ptr<const Value> &valueptr) {
2340  set_arg_local_impl<i>(ttg::Void{}, *valueptr);
2341  }
2342 
2343  template <typename Key>
2344  task_t *create_new_task(const Key &key) {
2345  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2346  auto &world_impl = world.impl();
2347  task_t *newtask;
2348  parsec_thread_mempool_t *mempool = get_task_mempool();
2349  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2350  int32_t priority = 0;
2351  if constexpr (!keyT_is_Void) {
2352  priority = priomap(key);
2353  /* placement-new the task */
2354  newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2355  } else {
2356  priority = priomap();
2357  /* placement-new the task */
2358  newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2359  }
2360 
2361  for (int i = 0; i < static_stream_goal.size(); ++i) {
2362  newtask->streams[i].goal = static_stream_goal[i];
2363  }
2364 
2365  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2366  return newtask;
2367  }
2368 
2369 
2370  template <std::size_t i>
2372  /* make sure we can reuse the existing memory pool and don't have to create a new one */
2373  static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2374  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2375  auto &world_impl = world.impl();
2376  detail::reducer_task_t *newtask;
2377  parsec_thread_mempool_t *mempool = get_task_mempool();
2378  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2379  // use the priority of the task we stream into
2380  int32_t priority = 0;
2381  if constexpr (!keyT_is_Void) {
2382  priority = priomap(task->key);
2383  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2384  } else {
2385  priority = priomap();
2386  ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2387  }
2388  /* placement-new the task */
2389  newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2390  world_impl.taskpool(), priority, is_first);
2391 
2392  return newtask;
2393  }
2394 
2395 
2396  // Used to set the i'th argument
2397  template <std::size_t i, typename Key, typename Value>
2398  void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2399  parsec_task_t **task_ring = nullptr) {
2400  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2401  constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2402  constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2403  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2404 
2405 
2406  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2407 
2408  parsec_key_t hk = 0;
2409  if constexpr (!keyT_is_Void) {
2410  hk = reinterpret_cast<parsec_key_t>(&key);
2411  assert(keymap(key) == world.rank());
2412  }
2413 
2414  task_t *task;
2415  auto &world_impl = world.impl();
2416  auto &reducer = std::get<i>(input_reducers);
2417  bool release = false;
2418  bool remove_from_hash = true;
2419 #if defined(PARSEC_PROF_GRAPHER)
2420  bool discover_task = true;
2421 #endif
2422  bool get_pull_data = false;
2423  bool has_lock = false;
2424  /* If we have only one input and no reducer on that input we can skip the hash table */
2425  if (numins > 1 || reducer) {
2426  has_lock = true;
2427  parsec_hash_table_lock_bucket(&tasks_table, hk);
2428  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2429  task = create_new_task(key);
2430  world_impl.increment_created();
2431  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
2432  get_pull_data = !is_lazy_pull();
2433  if( world_impl.dag_profiling() ) {
2434 #if defined(PARSEC_PROF_GRAPHER)
2435  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2436  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2437 #endif
2438  }
2439  } else if (!reducer && numins == (task->in_data_count + 1)) {
2440  /* remove while we have the lock */
2441  parsec_hash_table_nolock_remove(&tasks_table, hk);
2442  remove_from_hash = false;
2443  }
2444  /* if we have a reducer, we need to hold on to the lock for just a little longer */
2445  if (!reducer) {
2446  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2447  has_lock = false;
2448  }
2449  } else {
2450  task = create_new_task(key);
2451  world_impl.increment_created();
2452  remove_from_hash = false;
2453  if( world_impl.dag_profiling() ) {
2454 #if defined(PARSEC_PROF_GRAPHER)
2455  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2456  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2457 #endif
2458  }
2459  }
2460 
2461  if( world_impl.dag_profiling() ) {
2462 #if defined(PARSEC_PROF_GRAPHER)
2463  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2465  char orig_str[32];
2466  char dest_str[32];
2467  if(orig_index >= 0) {
2468  snprintf(orig_str, 32, "%d", orig_index);
2469  } else {
2470  strncpy(orig_str, "_", 32);
2471  }
2472  snprintf(dest_str, 32, "%lu", i);
2473  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2474  .flow_index = 0, .flow_datatype_mask = ~0 };
2475  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2476  .flow_index = 0, .flow_datatype_mask = ~0 };
2477  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, discover_task ? 1 : 0, &orig, &dest);
2478  }
2479 #endif
2480  }
2481 
2482  auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2483  detail::ttg_data_copy_t *copy = copy_in;
2484  if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2486  }
2487  if (nullptr != copy) {
2488  /* retain the data copy */
2489  copy = detail::register_data_copy<valueT>(copy, task, is_const);
2490  } else {
2491  /* create a new copy */
2492  copy = detail::create_new_datacopy(std::forward<Value>(value));
2493  if (!is_const) {
2494  copy->mark_mutable();
2495  }
2496  }
2497  return copy;
2498  };
2499 
2500  if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2501  auto submit_reducer_task = [&](auto *parent_task){
2502  /* check if we need to create a task */
2503  std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2504  //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2505  if (0 == c) {
2506  /* we are responsible for creating the reduction task */
2507  detail::reducer_task_t *reduce_task;
2508  reduce_task = create_new_reducer_task<i>(parent_task, false);
2509  reduce_task->release_task(reduce_task); // release immediately
2510  }
2511  };
2512 
2513  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2514  // have a value already? if not, set, otherwise reduce
2515  detail::ttg_data_copy_t *copy = nullptr;
2516  if (nullptr == (copy = task->copies[i])) {
2517  using decay_valueT = std::decay_t<valueT>;
2518 
2519  /* first input value, create a task and bind it to the copy */
2520  //std::cout << "Creating new reducer task for " << key << std::endl;
2521  detail::reducer_task_t *reduce_task;
2522  reduce_task = create_new_reducer_task<i>(task, true);
2523 
2524  /* protected by the bucket lock */
2525  task->streams[i].size = 1;
2526  task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2527 
2528  /* get the copy to use as input for this task */
2529  detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2530 
2531  /* put the copy into the task */
2532  task->copies[i] = copy;
2533 
2534  /* release the task if we're not deferred
2535  * TODO: can we delay that until we get the second value?
2536  */
2537  if (copy->get_next_task() != &reduce_task->parsec_task) {
2538  reduce_task->release_task(reduce_task);
2539  }
2540 
2541  /* now we can unlock the bucket */
2542  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2543  } else {
2544  /* unlock the bucket, the lock is not needed anymore */
2545  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2546 
2547  /* get the copy to use as input for this task */
2548  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2549 
2550  /* enqueue the data copy to be reduced */
2551  parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2552  submit_reducer_task(task);
2553  }
2554  } else {
2555  /* unlock the bucket, the lock is not needed anymore */
2556  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2557  /* submit reducer for void values to handle side effects */
2558  submit_reducer_task(task);
2559  }
2560  //if (release) {
2561  // parsec_hash_table_nolock_remove(&tasks_table, hk);
2562  // remove_from_hash = false;
2563  //}
2564  //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2565  } else {
2566  /* unlock the bucket, the lock is not needed anymore */
2567  if (has_lock) {
2568  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2569  }
2570  /* whether the task needs to be deferred or not */
2571  if constexpr (!valueT_is_Void) {
2572  if (nullptr != task->copies[i]) {
2573  ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2574  throw std::logic_error("bad set arg");
2575  }
2576 
2577  /* get the copy to use as input for this task */
2578  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2579 
2580  /* if we registered as a writer and were the first to register with this copy
2581  * we need to defer the release of this task to give other tasks a chance to
2582  * make a copy of the original data */
2583  release = (copy->get_next_task() != &task->parsec_task);
2584  task->copies[i] = copy;
2585  } else {
2586  release = true;
2587  }
2588  }
2589  task->remove_from_hash = remove_from_hash;
2590  if (release) {
2591  release_task(task, task_ring);
2592  }
2593  /* if not pulling lazily, pull the data here */
2594  if constexpr (!ttg::meta::is_void_v<keyT>) {
2595  if (get_pull_data) {
2596  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2597  }
2598  }
2599  }
2600 
2602  bool constrained = false;
2603  if (constraints_check.size() > 0) {
2604  if constexpr (ttg::meta::is_void_v<keyT>) {
2605  constrained = !constraints_check[0]();
2606  } else {
2607  constrained = !constraints_check[0](task->key);
2608  }
2609  }
2610  if (constrained) {
2611  // store the task so we can later access it once it is released
2612  parsec_hash_table_insert(&task_constraint_table, &task->tt_ht_item);
2613  }
2614  return !constrained;
2615  }
2616 
2617  template<typename Key = keyT>
2618  std::enable_if_t<ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid) {
2619  // check the next constraint, if any
2620  assert(cid < constraints_check.size());
2621  bool release = true;
2622  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2623  if (!constraints_check[i]()) {
2624  release = false;
2625  break;
2626  }
2627  }
2628  if (release) {
2629  // no constraint blocked us
2630  task_t *task;
2631  parsec_key_t hk = 0;
2632  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2633  assert(task != nullptr);
2634  auto &world_impl = world.impl();
2635  parsec_execution_stream_t *es = world_impl.execution_stream();
2636  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2637  __parsec_schedule_vp(es, vp_task_rings, 0);
2638  }
2639  }
2640 
2641  template<typename Key = keyT>
2642  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid, const std::span<Key>& keys) {
2643  assert(cid < constraints_check.size());
2644  parsec_task_t *task_ring = nullptr;
2645  for (auto& key : keys) {
2646  task_t *task;
2647  bool release = true;
2648  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2649  if (!constraints_check[i](key)) {
2650  release = false;
2651  break;
2652  }
2653  }
2654 
2655  if (release) {
2656  // no constraint blocked this task, so go ahead and release
2657  auto hk = reinterpret_cast<parsec_key_t>(&key);
2658  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2659  assert(task != nullptr);
2660  if (task_ring == nullptr) {
2661  /* the first task is set directly */
2662  task_ring = &task->parsec_task;
2663  } else {
2664  /* push into the ring */
2665  parsec_list_item_ring_push_sorted(&task_ring->super, &task->parsec_task.super,
2666  offsetof(parsec_task_t, priority));
2667  }
2668  }
2669  }
2670  if (nullptr != task_ring) {
2671  auto &world_impl = world.impl();
2672  parsec_execution_stream_t *es = world_impl.execution_stream();
2673  parsec_task_t *vp_task_rings[1] = { task_ring };
2674  __parsec_schedule_vp(es, vp_task_rings, 0);
2675  }
2676  }
2677 
2678  void release_task(task_t *task,
2679  parsec_task_t **task_ring = nullptr) {
2680  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2681 
2682  /* if remove_from_hash == false, someone has already removed the task from the hash table
2683  * so we know that the task is ready, no need to do atomic increments here */
2684  bool is_ready = !task->remove_from_hash;
2685  int32_t count;
2686  if (is_ready) {
2687  count = numins;
2688  } else {
2689  count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2690  assert(count <= self.dependencies_goal);
2691  }
2692 
2693  auto &world_impl = world.impl();
2694  ttT *baseobj = task->tt;
2695 
2696  if (count == numins) {
2697  parsec_execution_stream_t *es = world_impl.execution_stream();
2698  parsec_key_t hk = task->pkey();
2699  if (tracing()) {
2700  if constexpr (!keyT_is_Void) {
2701  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2702  } else {
2703  ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2704  }
2705  }
2706  if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2707 
2708  if (check_constraints(task)) {
2709  if (nullptr == task_ring) {
2710  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2711  __parsec_schedule_vp(es, vp_task_rings, 0);
2712  } else if (*task_ring == nullptr) {
2713  /* the first task is set directly */
2714  *task_ring = &task->parsec_task;
2715  } else {
2716  /* push into the ring */
2717  parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2718  offsetof(parsec_task_t, priority));
2719  }
2720  }
2721  } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2722  if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2723  /* lazily pull the pull terminal data */
2724  baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2725  }
2726  }
2727  }
2728 
2729  // cases 1+2
2730  template <std::size_t i, typename Key, typename Value>
2731  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2732  Value &&value) {
2733  set_arg_impl<i>(key, std::forward<Value>(value));
2734  }
2735 
2736  // cases 4+5+6
2737  template <std::size_t i, typename Key, typename Value>
2738  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2739  set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2740  }
2741 
2742  template <std::size_t i, typename Key = keyT>
2743  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2744  set_arg_impl<i>(ttg::Void{}, ttg::Void{});
2745  }
2746 
2747  // case 3
2748  template <std::size_t i, typename Key>
2749  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2750  set_arg_impl<i>(key, ttg::Void{});
2751  }
2752 
2753  template<typename Value, typename Key>
2754  bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2755  if constexpr (derived_has_device_op()) {
2756  /* don't inline if data is possibly on the device */
2757  return false;
2758  }
2759  /* non-device data */
2760  using decvalueT = std::decay_t<Value>;
2761  bool inline_data = false;
2762  /* check whether to send data in inline */
2763  std::size_t iov_size = 0;
2764  std::size_t metadata_size = 0;
2765  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2767  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2768  iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2769  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2770  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2771  metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2772  } else {
2773  /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2774  metadata_size = ttg::default_data_descriptor<ttg::meta::remove_cvr_t<Value>>::payload_size(value_ptr);
2775  detail::foreach_parsec_data(*value_ptr, [&](parsec_data_t* data){ iov_size += data->nb_elts; });
2776  }
2777  /* key is packed at the end */
2778  std::size_t key_pack_size = ttg::default_data_descriptor<Key>::payload_size(&key);
2779  std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2780  if (pack_size < detail::max_inline_size) {
2781  inline_data = true;
2782  }
2783  return inline_data;
2784  }
2785 
2786  // Used to set the i'th argument
2787  template <std::size_t i, typename Key, typename Value>
2788  void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2789  int owner;
2790  using decvalueT = std::decay_t<Value>;
2791  using norefvalueT = std::remove_reference_t<Value>;
2792  norefvalueT *value_ptr = &value;
2793 
2794 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2795  if(world.impl().profiling()) {
2796  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2797  }
2798 #endif
2799 
2800  if constexpr (!ttg::meta::is_void_v<Key>)
2801  owner = keymap(key);
2802  else
2803  owner = keymap();
2804  if (owner == world.rank()) {
2805  if constexpr (!ttg::meta::is_void_v<keyT>)
2806  set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2807  else
2808  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2809 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2810  if(world.impl().profiling()) {
2811  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2812  }
2813 #endif
2814  return;
2815  }
2816  // the target task is remote. Pack the information and send it to
2817  // the corresponding peer.
2818  // TODO do we need to copy value?
2819  using msg_t = detail::msg_t;
2820  auto &world_impl = world.impl();
2821  uint64_t pos = 0;
2822  int num_iovecs = 0;
2823  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2824  msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
2825 
2826  if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2827 
2828  detail::ttg_data_copy_t *copy = copy_in;
2829  /* make sure we have a data copy to register with */
2830  if (nullptr == copy) {
2832  if (nullptr == copy) {
2833  // We need to create a copy for this data, as it does not exist yet.
2834  copy = detail::create_new_datacopy(std::forward<Value>(value));
2835  // use the new value from here on out
2836  value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2837  }
2838  }
2839 
2840  bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2841  msg->tt_id.inline_data = inline_data;
2842 
2843  auto write_header_fn = [&]() {
2844  if (!inline_data) {
2845  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2846  * raw function pointer instead of a preregistered AM tag, so play that game.
2847  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2848  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2849  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2850  pos += sizeof(cbtag);
2851  }
2852  };
2853  auto handle_iovec_fn = [&](auto&& iovec){
2854 
2855  if (inline_data) {
2856  /* inline data is packed right after the tt_id in the message */
2857  std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
2858  pos += iovec.num_bytes;
2859  } else {
2860 
2865  copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2866  parsec_ce_mem_reg_handle_t lreg;
2867  size_t lreg_size;
2868  /* TODO: only register once when we can broadcast the data! */
2869  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2870  iovec.num_bytes, &lreg, &lreg_size);
2871  auto lreg_ptr = std::shared_ptr<void>{lreg, [](void *ptr) {
2872  parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr;
2873  parsec_ce.mem_unregister(&memreg);
2874  }};
2875  int32_t lreg_size_i = lreg_size;
2876  std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2877  pos += sizeof(lreg_size_i);
2878  std::memcpy(msg->bytes + pos, lreg, lreg_size);
2879  pos += lreg_size;
2880  //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2881  /* TODO: can we avoid the extra indirection of going through std::function? */
2882  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2883  /* shared_ptr of value and registration captured by value so resetting
2884  * them here will eventually release the memory/registration */
2886  lreg_ptr.reset();
2887  });
2888  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2889  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2890  pos += sizeof(fn_ptr);
2891  }
2892  };
2893 
2894  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2896  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2897  num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2898  /* pack the metadata */
2899  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2900  pos = pack(metadata, msg->bytes, pos);
2901  //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2902  write_header_fn();
2903  for (auto&& iov : iovs) {
2904  handle_iovec_fn(iov);
2905  }
2906  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2907  /* serialize the object */
2908  pos = pack(*value_ptr, msg->bytes, pos, copy);
2909  detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovecs; });
2910  //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2911  /* handle any iovecs contained in it */
2912  write_header_fn();
2913  detail::foreach_parsec_data(value, [&](parsec_data_t *data){
2914  handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private});
2915  });
2916  }
2917 
2918  msg->tt_id.num_iovecs = num_iovecs;
2919  }
2920 
2921  /* pack the key */
2922  msg->tt_id.num_keys = 0;
2923  msg->tt_id.key_offset = pos;
2924  if constexpr (!ttg::meta::is_void_v<Key>) {
2925  size_t tmppos = pack(key, msg->bytes, pos);
2926  pos = tmppos;
2927  msg->tt_id.num_keys = 1;
2928  }
2929 
2930  parsec_taskpool_t *tp = world_impl.taskpool();
2931  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2932  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2933  //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2934  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2935  sizeof(msg_header_t) + pos);
2936 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2937  if(world.impl().profiling()) {
2938  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2939  }
2940 #endif
2941 #if defined(PARSEC_PROF_GRAPHER)
2942  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2944  char orig_str[32];
2945  char dest_str[32];
2946  if(orig_index >= 0) {
2947  snprintf(orig_str, 32, "%d", orig_index);
2948  } else {
2949  strncpy(orig_str, "_", 32);
2950  }
2951  snprintf(dest_str, 32, "%lu", i);
2952  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2953  .flow_index = 0, .flow_datatype_mask = ~0 };
2954  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2955  .flow_index = 0, .flow_datatype_mask = ~0 };
2956  task_t *task = create_new_task(key);
2957  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, 0, &orig, &dest);
2958  delete task;
2959  }
2960 #endif
2961  }
2962 
2963  template <int i, typename Iterator, typename Value>
2964  void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
2965 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2966  if(world.impl().profiling()) {
2967  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2968  }
2969 #endif
2970  parsec_task_t *task_ring = nullptr;
2971  detail::ttg_data_copy_t *copy = nullptr;
2972  if (nullptr != detail::parsec_ttg_caller) {
2974  }
2975 
2976  for (auto it = begin; it != end; ++it) {
2977  set_arg_local_impl<i>(*it, value, copy, &task_ring);
2978  }
2979  /* submit all ready tasks at once */
2980  if (nullptr != task_ring) {
2981  parsec_task_t *vp_task_ring[1] = { task_ring };
2982  __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2983  }
2984 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2985  if(world.impl().profiling()) {
2986  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2987  }
2988 #endif
2989  }
2990 
2991  template <std::size_t i, typename Key, typename Value>
2992  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2993  void>
2994  broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
2995  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2996  auto world = ttg_default_execution_context();
2997  auto np = world.size();
2998  int rank = world.rank();
2999  uint64_t pos = 0;
3000  bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3001  [&](const Key &key) { return keymap(key) != rank; });
3002 
3003  if (have_remote) {
3004  using decvalueT = std::decay_t<Value>;
3005 
3006  /* sort the input key list by owner and check whether there are remote keys */
3007  std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
3008  std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
3009  int rank_a = keymap(a);
3010  int rank_b = keymap(b);
3011  // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
3012  int pos_a = (rank_a + np - rank) % np;
3013  int pos_b = (rank_b + np - rank) % np;
3014  return pos_a < pos_b;
3015  });
3016 
3017  /* Assuming there are no local keys, will be updated while iterating over the keys */
3018  auto local_begin = keylist_sorted.end();
3019  auto local_end = keylist_sorted.end();
3020 
3021  int32_t num_iovs = 0;
3022 
3025  assert(nullptr != copy);
3026 
3027  using msg_t = detail::msg_t;
3028  auto &world_impl = world.impl();
3029  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3030  msg_header_t::MSG_SET_ARG, i, world_impl.rank());
3031 
3032  /* check if we inline the data */
3033  /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
3034  bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3035  msg->tt_id.inline_data = inline_data;
3036 
3037  std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3038  auto write_iov_header = [&](){
3039  if (!inline_data) {
3040  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
3041  * raw function pointer instead of a preregistered AM tag, so play that game.
3042  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
3043  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
3044  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
3045  pos += sizeof(cbtag);
3046  }
3047  };
3048  auto handle_iov_fn = [&](auto&& iovec){
3049  if (inline_data) {
3050  /* inline data is packed right after the tt_id in the message */
3051  std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
3052  pos += iovec.num_bytes;
3053  } else {
3054  parsec_ce_mem_reg_handle_t lreg;
3055  size_t lreg_size;
3056  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
3057  iovec.num_bytes, &lreg, &lreg_size);
3058  /* TODO: use a static function for deregistration here? */
3059  memregs.push_back(std::make_pair(static_cast<int32_t>(lreg_size),
3060  /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
3061  std::shared_ptr<void>{lreg, [](void *ptr) {
3062  parsec_ce_mem_reg_handle_t memreg =
3063  (parsec_ce_mem_reg_handle_t)ptr;
3064  //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
3065  parsec_ce.mem_unregister(&memreg);
3066  }}));
3067  //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
3068  }
3069  };
3070 
3071  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
3073  /* pack the metadata */
3074  auto metadata = descr.get_metadata(value);
3075  pos = pack(metadata, msg->bytes, pos);
3076  auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
3077  num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3078  memregs.reserve(num_iovs);
3079  write_iov_header();
3080  for (auto &&iov : iovs) {
3081  handle_iov_fn(iov);
3082  }
3083  //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
3084  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
3085  /* serialize the object once */
3086  pos = pack(value, msg->bytes, pos, copy);
3087  detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovs; });
3088  memregs.reserve(num_iovs);
3089  write_iov_header();
3090  detail::foreach_parsec_data(value, [&](parsec_data_t *data){
3091  handle_iov_fn(ttg::iovec{data->nb_elts,
3092  data->device_copies[data->owner_device]->device_private});
3093  });
3094  }
3095 
3096  msg->tt_id.num_iovecs = num_iovs;
3097 
3098  std::size_t save_pos = pos;
3099 
3100  parsec_taskpool_t *tp = world_impl.taskpool();
3101  for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
3102 
3103  auto owner = keymap(*it);
3104  if (owner == rank) {
3105  local_begin = it;
3106  /* find first non-local key */
3107  local_end =
3108  std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
3109  it = local_end;
3110  continue;
3111  }
3112 
3113  /* rewind the buffer and start packing a new set of memregs and keys */
3114  pos = save_pos;
3120  if (!inline_data) {
3121  for (int idx = 0; idx < num_iovs; ++idx) {
3122  // auto [lreg_size, lreg_ptr] = memregs[idx];
3123  int32_t lreg_size;
3124  std::shared_ptr<void> lreg_ptr;
3125  std::tie(lreg_size, lreg_ptr) = memregs[idx];
3126  std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3127  pos += sizeof(lreg_size);
3128  std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3129  pos += lreg_size;
3130  //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3131  /* mark another reader on the copy */
3132  copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3133  /* create a function that will be invoked upon RMA completion at the target */
3134  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3135  /* shared_ptr of value and registration captured by value so resetting
3136  * them here will eventually release the memory/registration */
3138  lreg_ptr.reset();
3139  });
3140  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3141  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3142  pos += sizeof(fn_ptr);
3143  }
3144  }
3145 
3146  /* mark the beginning of the keys */
3147  msg->tt_id.key_offset = pos;
3148 
3149  /* pack all keys for this owner */
3150  int num_keys = 0;
3151  do {
3152  ++num_keys;
3153  pos = pack(*it, msg->bytes, pos);
3154  ++it;
3155  } while (it < keylist_sorted.end() && keymap(*it) == owner);
3156  msg->tt_id.num_keys = num_keys;
3157 
3158  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3159  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3160  //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3161  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3162  sizeof(msg_header_t) + pos);
3163  }
3164  /* handle local keys */
3165  broadcast_arg_local<i>(local_begin, local_end, value);
3166  } else {
3167  /* handle local keys */
3168  broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3169  }
3170  }
3171 
3172  // Used by invoke to set all arguments associated with a task
3173  // Is: index sequence of elements in args
3174  // Js: index sequence of input terminals to set
3175  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3176  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3177  std::index_sequence<Js...>, const Key &key,
3178  const std::tuple<Ts...> &args) {
3179  static_assert(sizeof...(Js) == sizeof...(Is));
3180  constexpr size_t js[] = {Js...};
3181  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3182  junk[0]++;
3183  }
3184 
3185  // Used by invoke to set all arguments associated with a task
3186  // Is: index sequence of input terminals to set
3187  template <typename Key, typename... Ts, size_t... Is>
3188  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3189  const std::tuple<Ts...> &args) {
3190  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3191  }
3192 
3193  // Used by invoke to set all arguments associated with a task
3194  // Is: index sequence of elements in args
3195  // Js: index sequence of input terminals to set
3196  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3197  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3198  const std::tuple<Ts...> &args) {
3199  static_assert(sizeof...(Js) == sizeof...(Is));
3200  constexpr size_t js[] = {Js...};
3201  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3202  junk[0]++;
3203  }
3204 
3205  // Used by invoke to set all arguments associated with a task
3206  // Is: index sequence of input terminals to set
3207  template <typename Key = keyT, typename... Ts, size_t... Is>
3208  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3209  const std::tuple<Ts...> &args) {
3210  set_args(std::index_sequence_for<Ts...>{}, is, args);
3211  }
3212 
3213  public:
3216  template <std::size_t i>
3217  void set_static_argstream_size(std::size_t size) {
3218  assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3219  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3220 
3221  this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3222 
3223  // Check if stream is already bounded
3224  if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3225  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3226  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3227  }
3228 
3229  static_stream_goal[i] = size;
3230  }
3231 
3235  template <std::size_t i, typename Key>
3236  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3237  // preconditions
3238  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3239  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3240 
3241  // body
3242  const auto owner = keymap(key);
3243  if (owner != world.rank()) {
3244  ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3245  using msg_t = detail::msg_t;
3246  auto &world_impl = world.impl();
3247  uint64_t pos = 0;
3248  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3250  world_impl.rank(), 1);
3251  /* pack the key */
3252  pos = pack(key, msg->bytes, pos);
3253  pos = pack(size, msg->bytes, pos);
3254  parsec_taskpool_t *tp = world_impl.taskpool();
3255  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3256  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3257  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3258  sizeof(msg_header_t) + pos);
3259  } else {
3260  ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3261 
3262  auto hk = reinterpret_cast<parsec_key_t>(&key);
3263  task_t *task;
3264  parsec_hash_table_lock_bucket(&tasks_table, hk);
3265  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3266  task = create_new_task(key);
3267  world.impl().increment_created();
3268  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3269  if( world.impl().dag_profiling() ) {
3270 #if defined(PARSEC_PROF_GRAPHER)
3271  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3272 #endif
3273  }
3274  }
3275  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3276 
3277  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3278  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3279 
3280  // commit changes
3281  // 1) "lock" the stream by incrementing the reduce_count
3282  // 2) set the goal
3283  // 3) "unlock" the stream
3284  // only one thread will see the reduce_count be zero and the goal match the size
3285  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3286  task->streams[i].goal = size;
3287  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3288  if (1 == c && (task->streams[i].size >= size)) {
3289  release_task(task);
3290  }
3291  }
3292  }
3293 
3296  template <std::size_t i, typename Key = keyT>
3297  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3298  // preconditions
3299  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3300  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3301 
3302  // body
3303  const auto owner = keymap();
3304  if (owner != world.rank()) {
3305  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3306  using msg_t = detail::msg_t;
3307  auto &world_impl = world.impl();
3308  uint64_t pos = 0;
3309  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3311  world_impl.rank(), 0);
3312  pos = pack(size, msg->bytes, pos);
3313  parsec_taskpool_t *tp = world_impl.taskpool();
3314  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3315  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3316  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3317  sizeof(msg_header_t) + pos);
3318  } else {
3319  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3320 
3321  parsec_key_t hk = 0;
3322  task_t *task;
3323  parsec_hash_table_lock_bucket(&tasks_table, hk);
3324  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3325  task = create_new_task(ttg::Void{});
3326  world.impl().increment_created();
3327  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3328  if( world.impl().dag_profiling() ) {
3329 #if defined(PARSEC_PROF_GRAPHER)
3330  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3331 #endif
3332  }
3333  }
3334  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3335 
3336  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3337  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3338 
3339  // commit changes
3340  // 1) "lock" the stream by incrementing the reduce_count
3341  // 2) set the goal
3342  // 3) "unlock" the stream
3343  // only one thread will see the reduce_count be zero and the goal match the size
3344  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3345  task->streams[i].goal = size;
3346  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3347  if (1 == c && (task->streams[i].size >= size)) {
3348  release_task(task);
3349  }
3350  }
3351  }
3352 
3354  template <std::size_t i, typename Key>
3355  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3356  // preconditions
3357  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3358 
3359  // body
3360  const auto owner = keymap(key);
3361  if (owner != world.rank()) {
3362  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3363  using msg_t = detail::msg_t;
3364  auto &world_impl = world.impl();
3365  uint64_t pos = 0;
3366  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3368  world_impl.rank(), 1);
3369  /* pack the key */
3370  pos = pack(key, msg->bytes, pos);
3371  parsec_taskpool_t *tp = world_impl.taskpool();
3372  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3373  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3374  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3375  sizeof(msg_header_t) + pos);
3376  } else {
3377  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3378 
3379  auto hk = reinterpret_cast<parsec_key_t>(&key);
3380  task_t *task = nullptr;
3381  //parsec_hash_table_lock_bucket(&tasks_table, hk);
3382  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3383  ttg::print_error(world.rank(), ":", get_name(), ":", key,
3384  " : error finalize called on stream that never received an input data: ", i);
3385  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3386  }
3387 
3388  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3389  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3390 
3391  // commit changes
3392  // 1) "lock" the stream by incrementing the reduce_count
3393  // 2) set the goal
3394  // 3) "unlock" the stream
3395  // only one thread will see the reduce_count be zero and the goal match the size
3396  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3397  task->streams[i].goal = 1;
3398  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3399  if (1 == c && (task->streams[i].size >= 1)) {
3400  release_task(task);
3401  }
3402  }
3403  }
3404 
3406  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3407  std::enable_if_t<key_is_void, void> finalize_argstream() {
3408  // preconditions
3409  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3410 
3411  // body
3412  const auto owner = keymap();
3413  if (owner != world.rank()) {
3414  ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3415  using msg_t = detail::msg_t;
3416  auto &world_impl = world.impl();
3417  uint64_t pos = 0;
3418  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3420  world_impl.rank(), 0);
3421  parsec_taskpool_t *tp = world_impl.taskpool();
3422  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3423  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3424  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3425  sizeof(msg_header_t) + pos);
3426  } else {
3427  ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3428 
3429  auto hk = static_cast<parsec_key_t>(0);
3430  task_t *task = nullptr;
3431  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3432  ttg::print_error(world.rank(), ":", get_name(),
3433  " : error finalize called on stream that never received an input data: ", i);
3434  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3435  }
3436 
3437  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3438  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3439 
3440  // commit changes
3441  // 1) "lock" the stream by incrementing the reduce_count
3442  // 2) set the goal
3443  // 3) "unlock" the stream
3444  // only one thread will see the reduce_count be zero and the goal match the size
3445  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3446  task->streams[i].goal = 1;
3447  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3448  if (1 == c && (task->streams[i].size >= 1)) {
3449  release_task(task);
3450  }
3451  }
3452  }
3453 
3454  template<typename Value>
3455  void copy_mark_pushout(const Value& value) {
3456 
3457  assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3458  parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task;
3459  auto check_parsec_data = [&](parsec_data_t* data) {
3460  if (data->owner_device != 0) {
3461  /* find the flow */
3462  int flowidx = 0;
3463  while (flowidx < MAX_PARAM_COUNT &&
3464  gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3465  if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3466  /* found the right data, set the corresponding flow as pushout */
3467  break;
3468  }
3469  ++flowidx;
3470  }
3471  if (flowidx == MAX_PARAM_COUNT) {
3472  throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3473  }
3474  if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3475  /* no flow found, add one and mark it pushout */
3476  detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3477  gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3478  }
3479  /* need to mark the flow RW to make PaRSEC happy */
3480  ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3481  gpu_task->pushout |= 1<<flowidx;
3482  }
3483  };
3485  [&](parsec_data_t* data){
3486  check_parsec_data(data);
3487  });
3488  }
3489 
3490 
3491  /* check whether a data needs to be pushed out */
3492  template <std::size_t i, typename Value, typename RemoteCheckFn>
3493  std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3494  void>
3495  do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) {
3496  constexpr const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3497 
3498  /* get the copy */
3501 
3502  /* if there is no copy we don't need to prepare anything */
3503  if (nullptr == copy) {
3504  return;
3505  }
3506 
3508  bool need_pushout = false;
3509 
3511  /* already marked pushout, skip the rest */
3512  return;
3513  }
3514 
3515  /* TODO: remove this once we support reductions on the GPU */
3516  auto &reducer = std::get<i>(input_reducers);
3517  if (reducer) {
3518  /* reductions are currently done only on the host so push out */
3519  copy_mark_pushout(value);
3521  return;
3522  }
3523 
3524  if constexpr (value_is_const) {
3526  /* The data has been modified previously. If not all devices can access
3527  * their peers then we need to push out to the host so that all devices
3528  * have the data available for reading.
3529  * NOTE: we currently don't allow users to force the next writer to be
3530  * on a different device. In that case PaRSEC would take the host-side
3531  * copy. If we change our restriction we need to revisit this.
3532  * Ideally, PaRSEC would take the device copy if the owner moves... */
3533  need_pushout = !detail::all_devices_peer_access;
3534  }
3535 
3536  /* check for multiple readers */
3539  }
3540 
3542  /* there is a writer already, we will need to create a copy */
3543  need_pushout = true;
3544  }
3545 
3547  } else {
3550  need_pushout = true;
3551  } else {
3553  /* there are readers, we will need to create a copy */
3554  need_pushout = true;
3555  }
3557  }
3558  }
3559 
3560  if constexpr (!derived_has_device_op()) {
3561  need_pushout = true;
3562  }
3563 
3564  /* check if there are non-local successors if it's a device task */
3565  if (!need_pushout) {
3566  bool device_supported = false;
3567  if constexpr (derived_has_cuda_op()) {
3568  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::CUDA);
3569  } else if constexpr (derived_has_hip_op()) {
3570  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::HIP);
3571  } else if constexpr (derived_has_level_zero_op()) {
3572  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::L0);
3573  }
3574  /* if MPI supports the device we don't care whether we have remote peers
3575  * because we can send from the device directly */
3576  if (!device_supported) {
3577  need_pushout = remote_check();
3578  }
3579  }
3580 
3581  if (need_pushout) {
3582  copy_mark_pushout(value);
3584  }
3585  }
3586 
3587  /* check whether a data needs to be pushed out */
3588  template <std::size_t i, typename Key, typename Value>
3589  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3590  void>
3591  prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3592  auto remote_check = [&](){
3593  auto world = ttg_default_execution_context();
3594  int rank = world.rank();
3595  bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3596  [&](const Key &key) { return keymap(key) != rank; });
3597  return remote;
3598  };
3599  do_prepare_send<i>(value, remote_check);
3600  }
3601 
3602  template <std::size_t i, typename Key, typename Value>
3603  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3604  void>
3605  prepare_send(const Value &value) {
3606  auto remote_check = [&](){
3607  auto world = ttg_default_execution_context();
3608  int rank = world.rank();
3609  return (keymap() != rank);
3610  };
3611  do_prepare_send<i>(value, remote_check);
3612  }
3613 
3614  private:
3615  // Copy/assign/move forbidden ... we could make it work using
3616  // PIMPL for this base class. However, this instance of the base
3617  // class is tied to a specific instance of a derived class a
3618  // pointer to which is captured for invoking derived class
3619  // functions. Thus, not only does the derived class has to be
3620  // involved but we would have to do it in a thread safe way
3621  // including for possibly already running tasks and remote
3622  // references. This is not worth the effort ... wherever you are
3623  // wanting to move/assign an TT you should be using a pointer.
3624  TT(const TT &other) = delete;
3625  TT &operator=(const TT &other) = delete;
3626  TT(TT &&other) = delete;
3627  TT &operator=(TT &&other) = delete;
3628 
3629  // Registers the callback for the i'th input terminal
3630  template <typename terminalT, std::size_t i>
3631  void register_input_callback(terminalT &input) {
3632  using valueT = std::decay_t<typename terminalT::value_type>;
3633  if (input.is_pull_terminal) {
3634  num_pullins++;
3635  }
3637  // case 1: nonvoid key, nonvoid value
3639  if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3640  auto move_callback = [this](const keyT &key, valueT &&value) {
3641  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3642  };
3643  auto send_callback = [this](const keyT &key, const valueT &value) {
3644  set_arg<i, keyT, const valueT &>(key, value);
3645  };
3646  auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3647  broadcast_arg<i, keyT, valueT>(keylist, value);
3648  };
3649  auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3650  prepare_send<i, keyT, valueT>(keylist, value);
3651  };
3652  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3653  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3654  input.set_callback(send_callback, move_callback, broadcast_callback,
3655  setsize_callback, finalize_callback, prepare_send_callback);
3656  }
3658  // case 2: nonvoid key, void value, mixed inputs
3660  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3661  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3662  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3663  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3664  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3665  }
3667  // case 3: nonvoid key, void value, no inputs
3668  // NOTE: subsumed in case 2 above, kept for historical reasons
3671  // case 4: void key, nonvoid value
3673  else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3674  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3675  auto send_callback = [this](const valueT &value) {
3676  if constexpr (std::is_copy_constructible_v<valueT>) {
3677  set_arg<i, keyT, const valueT &>(value);
3678  }
3679  else {
3680  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + typeid(std::decay_t<valueT>).name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3681  }
3682  };
3683  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3684  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3685  auto prepare_send_callback = [this](const valueT &value) {
3686  prepare_send<i, void>(value);
3687  };
3688  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3689  }
3691  // case 5: void key, void value, mixed inputs
3693  else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3694  auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3695  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3696  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3697  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3698  }
3700  // case 6: void key, void value, no inputs
3701  // NOTE: subsumed in case 5 above, kept for historical reasons
3703  else
3704  ttg::abort();
3705  }
3706 
3707  template <std::size_t... IS>
3708  void register_input_callbacks(std::index_sequence<IS...>) {
3709  int junk[] = {
3710  0,
3711  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3712  0)...};
3713  junk[0]++;
3714  }
3715 
3716  template <std::size_t... IS, typename inedgesT>
3717  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3718  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3719  junk[0]++;
3720  }
3721 
3722  template <std::size_t... IS, typename outedgesT>
3723  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3724  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3725  junk[0]++;
3726  }
3727 
3728 #if 0
3729  template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3730  void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3731  int junk[] = {0,
3732  (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3733  (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3734  : PARSEC_FLOW_ACCESS_RW),
3735  0)...};
3736  junk[0]++;
3737  }
3738 
3739  template <typename input_terminals_tupleT, typename flowsT>
3740  void initialize_flows(flowsT &&flows) {
3741  _initialize_flows<input_terminals_tupleT>(
3742  std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3743  }
3744 #endif // 0
3745 
3746  void fence() override { ttg::default_execution_context().impl().fence(); }
3747 
3748  static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3749  if constexpr (std::is_same_v<keyT, void>) {
3750  return 1;
3751  } else {
3752  keyT &ka = *(reinterpret_cast<keyT *>(a));
3753  keyT &kb = *(reinterpret_cast<keyT *>(b));
3754  return ka == kb;
3755  }
3756  }
3757 
3758  static uint64_t key_hash(parsec_key_t k, void *user_data) {
3759  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3760  if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3761  return 0;
3762  } else {
3763  keyT &kk = *(reinterpret_cast<keyT *>(k));
3764  using ttg::hash;
3765  uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3766  return hv;
3767  }
3768  }
3769 
3770  static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3771  if constexpr (std::is_same_v<keyT, void>) {
3772  buffer[0] = '\0';
3773  return buffer;
3774  } else {
3775  keyT kk = *(reinterpret_cast<keyT *>(k));
3776  std::stringstream iss;
3777  iss << kk;
3778  memset(buffer, 0, buffer_size);
3779  iss.get(buffer, buffer_size);
3780  return buffer;
3781  }
3782  }
3783 
3784  static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3785  // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3786  keyT *key = *(keyT**)&(as[2]);
3787  return reinterpret_cast<parsec_key_t>(key);
3788  }
3789 
3790  static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3791  if(buffer_size == 0)
3792  return buffer;
3793 
3794  if constexpr (ttg::meta::is_void_v<keyT>) {
3795  snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3796  } else {
3797  const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3798  std::stringstream ss;
3799  ss << task->key;
3800 
3801  std::string keystr = ss.str();
3802  std::replace(keystr.begin(), keystr.end(), '(', ':');
3803  std::replace(keystr.begin(), keystr.end(), ')', ':');
3804 
3805  snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3806  }
3807  return buffer;
3808  }
3809 
3810 #if defined(PARSEC_PROF_TRACE)
3811  static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3812  {
3813  const task_t *task = reinterpret_cast<const task_t *>(data);
3814 
3815  if constexpr (ttg::meta::is_void_v<keyT>) {
3816  snprintf(reinterpret_cast<char*>(dst), size, "()");
3817  } else {
3818  std::stringstream ss;
3819  ss << task->key;
3820  snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3821  }
3822  return dst;
3823  }
3824 #endif
3825 
3826  parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3827 
3828  static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3829 
3830  //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3831 
3832  task_t *task = (task_t*)parsec_task;
3833 
3834 #ifdef TTG_HAVE_COROUTINE
3835  /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3836  if (task->suspended_task_address) {
3837  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3838 #ifdef TTG_HAVE_DEVICE
3839  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3840  // get the device task from the coroutine handle
3841  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3842 
3843  // get the promise which contains the views
3844  auto dev_data = dev_task.promise();
3845 
3846  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3847  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3848 
3849  /* execute the sends we stored */
3850  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3851  /* set the current task, needed inside the sends */
3853  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3854  task->tt->set_outputs_tls_ptr();
3855  dev_data.do_sends(); // all sends happen here
3856  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3857  detail::parsec_ttg_caller = nullptr;
3858  }
3859  dev_task.destroy(); // safe to destroy the coroutine now
3860  }
3861 #endif // TTG_HAVE_DEVICE
3862  /* the coroutine should have completed and we cannot access the promise anymore */
3863  task->suspended_task_address = nullptr;
3864  }
3865 #endif // TTG_HAVE_COROUTINE
3866 
3867  /* release our data copies */
3868  for (int i = 0; i < task->data_count; i++) {
3869  detail::ttg_data_copy_t *copy = task->copies[i];
3870  if (nullptr == copy) continue;
3872  task->copies[i] = nullptr;
3873  }
3874 
3875  for (auto& c : task->tt->constraints_complete) {
3876  if constexpr(std::is_void_v<keyT>) {
3877  c();
3878  } else {
3879  c(task->key);
3880  }
3881  }
3882  return PARSEC_HOOK_RETURN_DONE;
3883  }
3884 
3885  public:
3886  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3887  typename priomapT = ttg::detail::default_priomap<keyT>>
3888  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3889  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3890  : ttg::TTBase(name, numinedges, numouts)
3891  , world(world)
3892  // if using default keymap, rebind to the given world
3893  , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3894  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3895  : decltype(keymap)(std::forward<keymapT>(keymap_)))
3896  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3897  // Cannot call these in base constructor since terminals not yet constructed
3898  if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3899  if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3900 
3901  auto &world_impl = world.impl();
3902  world_impl.register_op(this);
3903 
3904  if constexpr (numinedges == numins) {
3905  register_input_terminals(input_terminals, innames);
3906  } else {
3907  // create a name for the virtual control input
3908  register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3909  }
3910  register_output_terminals(output_terminals, outnames);
3911 
3912  register_input_callbacks(std::make_index_sequence<numinedges>{});
3913  int i;
3914 
3915  memset(&self, 0, sizeof(parsec_task_class_t));
3916 
3917  self.name = strdup(get_name().c_str());
3918  self.task_class_id = get_instance_id();
3919  self.nb_parameters = 0;
3920  self.nb_locals = 0;
3921  //self.nb_flows = numflows;
3922  self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3923  // trick the device handler into looking at all of them
3924 
3925  if( world_impl.profiling() ) {
3926  // first two ints are used to store the hash of the key.
3927  self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3928  // seconds two ints are used to store a pointer to the key of the task.
3929  self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3930 
3931  // If we have parameters and locals, we need to define the corresponding dereference arrays
3932  self.params[0] = &detail::parsec_taskclass_param0;
3933  self.params[1] = &detail::parsec_taskclass_param1;
3934 
3935  self.locals[0] = &detail::parsec_taskclass_param0;
3936  self.locals[1] = &detail::parsec_taskclass_param1;
3937  self.locals[2] = &detail::parsec_taskclass_param2;
3938  self.locals[3] = &detail::parsec_taskclass_param3;
3939  }
3940  self.make_key = make_key;
3941  self.key_functions = &tasks_hash_fcts;
3942  self.task_snprintf = parsec_ttg_task_snprintf;
3943 
3944 #if defined(PARSEC_PROF_TRACE)
3945  self.profile_info = &parsec_ttg_task_info;
3946 #endif
3947 
3948  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
3949  // function_id_to_instance[self.task_class_id] = this;
3950  //self.incarnations = incarnations_array.data();
3951 //#if 0
3952  if constexpr (derived_has_cuda_op()) {
3953  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3954  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
3955  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3956  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3957  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3958  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3959  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3960  } else if constexpr (derived_has_hip_op()) {
3961  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3962  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
3963  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3964  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
3965 
3966  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3967  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3968  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3969 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3970  } else if constexpr (derived_has_level_zero_op()) {
3971  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3972  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3973  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3974  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3975 
3976  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3977  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3978  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3979 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
3980  } else {
3981  self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
3982  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
3983  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
3984  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
3985  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3986  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3987  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3988  }
3989 //#endif // 0
3990 
3991  self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3992  self.complete_execution = complete_task_and_release;
3993 
3994  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3995  parsec_flow_t *flow = new parsec_flow_t;
3996  flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
3997  flow->sym_type = PARSEC_SYM_INOUT;
3998  // see initialize_flows below
3999  // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
4000  flow->dep_in[0] = NULL;
4001  flow->dep_out[0] = NULL;
4002  flow->flow_index = i;
4003  flow->flow_datatype_mask = ~0;
4004  *((parsec_flow_t **)&(self.in[i])) = flow;
4005  }
4006  //*((parsec_flow_t **)&(self.in[i])) = NULL;
4007  //initialize_flows<input_terminals_type>(self.in);
4008 
4009  for (i = 0; i < MAX_PARAM_COUNT; i++) {
4010  parsec_flow_t *flow = new parsec_flow_t;
4011  flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
4012  flow->sym_type = PARSEC_SYM_INOUT;
4013  flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
4014  flow->dep_in[0] = NULL;
4015  flow->dep_out[0] = NULL;
4016  flow->flow_index = i;
4017  flow->flow_datatype_mask = (1 << i);
4018  *((parsec_flow_t **)&(self.out[i])) = flow;
4019  }
4020  //*((parsec_flow_t **)&(self.out[i])) = NULL;
4021 
4022  self.flags = 0;
4023  self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
4024 
4025  int nbthreads = 0;
4026  auto *context = world_impl.context();
4027  for (int i = 0; i < context->nb_vp; i++) {
4028  nbthreads += context->virtual_processes[i]->nb_cores;
4029  }
4030 
4031  parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t), sizeof(task_t),
4032  offsetof(parsec_task_t, mempool_owner), nbthreads);
4033 
4034  parsec_hash_table_init(&tasks_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4035  NULL);
4036 
4037  parsec_hash_table_init(&task_constraint_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4038  NULL);
4039  }
4040 
4041  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4042  typename priomapT = ttg::detail::default_priomap<keyT>>
4043  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4044  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4045  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
4046  std::forward<priomapT>(priomap)) {}
4047 
4048  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4049  typename priomapT = ttg::detail::default_priomap<keyT>>
4050  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4051  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
4052  keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4053  : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4054  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4055  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4056  //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
4057  if constexpr (numinedges > 0) {
4058  register_input_callbacks(std::make_index_sequence<numinedges>{});
4059  }
4060  }
4061  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4062  typename priomapT = ttg::detail::default_priomap<keyT>>
4063  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4064  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4065  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4066  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
4067  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4068 
4069  // Destructor checks for unexecuted tasks
4070  virtual ~TT() {
4071  if(nullptr != self.name ) {
4072  free((void*)self.name);
4073  self.name = nullptr;
4074  }
4075 
4076  for (std::size_t i = 0; i < numins; ++i) {
4077  if (inpute_reducers_taskclass[i] != nullptr) {
4078  std::free(inpute_reducers_taskclass[i]);
4079  inpute_reducers_taskclass[i] = nullptr;
4080  }
4081  }
4082  release();
4083  }
4084 
4085  static void ht_iter_cb(void *item, void *cb_data) {
4086  task_t *task = (task_t *)item;
4087  ttT *op = (ttT *)cb_data;
4088  if constexpr (!ttg::meta::is_void_v<keyT>) {
4089  std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
4090  } else {
4091  std::cout << "Left over task " << op->get_name() << std::endl;
4092  }
4093  }
4094 
4096  parsec_hash_table_for_all(&tasks_table, ht_iter_cb, this);
4097  }
4098 
4099  virtual void release() override { do_release(); }
4100 
4101  void do_release() {
4102  if (!alive) {
4103  return;
4104  }
4105  alive = false;
4106  /* print all outstanding tasks */
4108  parsec_hash_table_fini(&tasks_table);
4109  parsec_mempool_destruct(&mempools);
4110  // uintptr_t addr = (uintptr_t)self.incarnations;
4111  // free((void *)addr);
4112  free((__parsec_chore_t *)self.incarnations);
4113  for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4114  if (NULL != self.in[i]) {
4115  free(self.in[i]->name);
4116  delete self.in[i];
4117  self.in[i] = nullptr;
4118  }
4119  if (NULL != self.out[i]) {
4120  free(self.out[i]->name);
4121  delete self.out[i];
4122  self.out[i] = nullptr;
4123  }
4124  }
4125  world.impl().deregister_op(this);
4126  }
4127 
4128  static constexpr const ttg::Runtime runtime = ttg::Runtime::PaRSEC;
4129 
4135  template <std::size_t i, typename Reducer>
4136  void set_input_reducer(Reducer &&reducer) {
4137  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4138  std::get<i>(input_reducers) = reducer;
4139 
4140  parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4141  if (nullptr == tc) {
4142  tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4143  inpute_reducers_taskclass[i] = tc;
4144 
4145  tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4146  tc->task_class_id = get_instance_id();
4147  tc->nb_parameters = 0;
4148  tc->nb_locals = 0;
4149  tc->nb_flows = numflows;
4150 
4151  auto &world_impl = world.impl();
4152 
4153  if( world_impl.profiling() ) {
4154  // first two ints are used to store the hash of the key.
4155  tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4156  // seconds two ints are used to store a pointer to the key of the task.
4157  tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4158 
4159  // If we have parameters and locals, we need to define the corresponding dereference arrays
4160  tc->params[0] = &detail::parsec_taskclass_param0;
4161  tc->params[1] = &detail::parsec_taskclass_param1;
4162 
4163  tc->locals[0] = &detail::parsec_taskclass_param0;
4164  tc->locals[1] = &detail::parsec_taskclass_param1;
4165  tc->locals[2] = &detail::parsec_taskclass_param2;
4166  tc->locals[3] = &detail::parsec_taskclass_param3;
4167  }
4168  tc->make_key = make_key;
4169  tc->key_functions = &tasks_hash_fcts;
4170  tc->task_snprintf = parsec_ttg_task_snprintf;
4171 
4172 #if defined(PARSEC_PROF_TRACE)
4173  tc->profile_info = &parsec_ttg_task_info;
4174 #endif
4175 
4176  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4177 
4178 #if 0
4179  // FIXME: currently only support reduction on the host
4180  if constexpr (derived_has_cuda_op()) {
4181  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4182  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4183  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4184  ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4185  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4186  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4187  ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4188  ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4189  ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4190  ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4191  } else
4192 #endif // 0
4193  {
4194  tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4195  ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4196  ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4197  ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4198  ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4199  ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4200  ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4201  }
4202 
4203  /* the reduction task does not alter the termination detection because the target task will execute */
4204  tc->release_task = &parsec_release_task_to_mempool;
4205  tc->complete_execution = NULL;
4206  }
4207  }
4208 
4216  template <std::size_t i, typename Reducer>
4217  void set_input_reducer(Reducer &&reducer, std::size_t size) {
4218  set_input_reducer<i>(std::forward<Reducer>(reducer));
4219  set_static_argstream_size<i>(size);
4220  }
4221 
4222  // Returns reference to input terminal i to facilitate connection --- terminal
4223  // cannot be copied, moved or assigned
4224  template <std::size_t i>
4225  std::tuple_element_t<i, input_terminals_type> *in() {
4226  return &std::get<i>(input_terminals);
4227  }
4228 
4229  // Returns reference to output terminal for purpose of connection --- terminal
4230  // cannot be copied, moved or assigned
4231  template <std::size_t i>
4232  std::tuple_element_t<i, output_terminalsT> *out() {
4233  return &std::get<i>(output_terminals);
4234  }
4235 
4236  // Manual injection of a task with all input arguments specified as a tuple
4237  template <typename Key = keyT>
4238  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4239  const Key &key, const input_values_tuple_type &args) {
4241  if constexpr(!std::is_same_v<Key, key_type>) {
4242  key_type k = key; /* cast that type into the key type we know */
4243  invoke(k, args);
4244  } else {
4245  /* trigger non-void inputs */
4246  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4247  /* trigger void inputs */
4248  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4249  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4250  }
4251  }
4252 
4253  // Manual injection of a key-free task and all input arguments specified as a tuple
4254  template <typename Key = keyT>
4255  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4256  const input_values_tuple_type &args) {
4258  /* trigger non-void inputs */
4259  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4260  /* trigger void inputs */
4261  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4262  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4263  }
4264 
4265  // Manual injection of a task that has no arguments
4266  template <typename Key = keyT>
4267  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4268  const Key &key) {
4270 
4271  if constexpr(!std::is_same_v<Key, key_type>) {
4272  key_type k = key; /* cast that type into the key type we know */
4273  invoke(k);
4274  } else {
4275  /* trigger void inputs */
4276  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4277  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4278  }
4279  }
4280 
4281  // Manual injection of a task that has no key or arguments
4282  template <typename Key = keyT>
4283  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4285  /* trigger void inputs */
4286  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4287  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4288  }
4289 
4290  // overrides TTBase::invoke()
4291  void invoke() override {
4292  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4293  invoke<keyT>();
4294  else
4295  TTBase::invoke();
4296  }
4297 
4298  private:
4299  template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4300  void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4301  using arg_type = std::decay_t<Arg>;
4302  if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4303  /* add a reference to the object */
4304  auto copy = ttg_parsec::detail::get_copy(arg);
4305  copy->add_ref();
4306  /* reset readers so that the value can flow without copying */
4307  copy->reset_readers();
4308  auto& val = *arg;
4309  set_arg_impl<I>(key, val, copy);
4311  if constexpr (std::is_rvalue_reference_v<Arg>) {
4312  /* if the ptr was moved in we reset it */
4313  arg.reset();
4314  }
4315  } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4316  set_arg<I>(key, std::forward<Arg>(arg));
4317  }
4318  if constexpr (sizeof...(Is) > 0) {
4319  /* recursive next argument */
4320  invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4321  }
4322  }
4323 
4324  public:
4325  // Manual injection of a task with all input arguments specified as variadic arguments
4326  template <typename Key = keyT, typename Arg, typename... Args>
4327  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4328  const Key &key, Arg&& arg, Args&&... args) {
4329  static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4330  "Number of arguments to invoke must match the number of task inputs.");
4332  /* trigger non-void inputs */
4333  invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4334  std::forward<Arg>(arg), std::forward<Args>(args)...);
4335  //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4336  /* trigger void inputs */
4337  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4338  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4339  }
4340 
4341  void set_defer_writer(bool value) {
4342  m_defer_writer = value;
4343  }
4344 
4345  bool get_defer_writer(bool value) {
4346  return m_defer_writer;
4347  }
4348 
4349  public:
4350  void make_executable() override {
4351  world.impl().register_tt_profiling(this);
4354  }
4355 
4358  const decltype(keymap) &get_keymap() const { return keymap; }
4359 
4361  template <typename Keymap>
4362  void set_keymap(Keymap &&km) {
4363  keymap = km;
4364  }
4365 
4368  const decltype(priomap) &get_priomap() const { return priomap; }
4369 
4372  template <typename Priomap>
4373  void set_priomap(Priomap &&pm) {
4374  priomap = std::forward<Priomap>(pm);
4375  }
4376 
4382  template<typename Devicemap>
4383  void set_devicemap(Devicemap&& dm) {
4384  //static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4385  if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4386  // dm returns a Device
4387  devicemap = std::forward<Devicemap>(dm);
4388  } else {
4389  // convert dm return into a Device
4390  devicemap = [=](const keyT& key) {
4391  if constexpr (derived_has_cuda_op()) {
4393  } else if constexpr (derived_has_hip_op()) {
4395  } else if constexpr (derived_has_level_zero_op()) {
4397  } else {
4398  throw std::runtime_error("Unknown device type!");
4399  return ttg::device::Device{};
4400  }
4401  };
4402  }
4403  }
4404 
4407  auto get_devicemap() { return devicemap; }
4408 
4411  template<typename Constraint>
4412  void add_constraint(std::shared_ptr<Constraint> c) {
4413  std::size_t cid = constraints_check.size();
4414  if constexpr(ttg::meta::is_void_v<keyT>) {
4415  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4416  constraints_check.push_back([c, this](){ return c->check(this); });
4417  constraints_complete.push_back([c, this](const keyT& key){ c->complete(this); return true; });
4418  } else {
4419  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4420  constraints_check.push_back([c, this](const keyT& key){ return c->check(key, this); });
4421  constraints_complete.push_back([c, this](const keyT& key){ c->complete(key, this); return true; });
4422  }
4423  }
4424 
4427  template<typename Constraint>
4428  void add_constraint(Constraint&& c) {
4429  // need to make this a shared_ptr since it's shared between different callbacks
4430  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4431  }
4432 
4436  template<typename Constraint, typename Mapper>
4437  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
4438  static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4439  std::size_t cid = constraints_check.size();
4440  if constexpr(ttg::meta::is_void_v<keyT>) {
4441  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4442  constraints_check.push_back([map, c, this](){ return c->check(map(), this); });
4443  constraints_complete.push_back([map, c, this](){ c->complete(map(), this); return true; });
4444  } else {
4445  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4446  constraints_check.push_back([map, c, this](const keyT& key){ return c->check(key, map(key), this); });
4447  constraints_complete.push_back([map, c, this](const keyT& key){ c->complete(key, map(key), this); return true; });
4448  }
4449  }
4450 
4454  template<typename Constraint, typename Mapper>
4455  void add_constraint(Constraint c, Mapper&& map) {
4456  // need to make this a shared_ptr since it's shared between different callbacks
4457  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4458  }
4459 
4460  // Register the static_op function to associate it to instance_id
4462  int rank;
4463  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
4464  ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4465  static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4466  auto &world_impl = world.impl();
4467  static_map_mutex.lock();
4468  static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4469  if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4470  auto tp = world_impl.taskpool();
4471 
4472  ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4473  " messages delayed with op_id ", get_instance_id());
4474 
4475  auto se = delayed_unpack_actions.equal_range(get_instance_id());
4476  std::vector<static_set_arg_fct_arg_t> tmp;
4477  for (auto it = se.first; it != se.second;) {
4478  assert(it->first == get_instance_id());
4479  tmp.push_back(it->second);
4480  it = delayed_unpack_actions.erase(it);
4481  }
4482  static_map_mutex.unlock();
4483 
4484  for (auto it : tmp) {
4485  if(ttg::tracing())
4486  ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4487  std::get<1>(it), ", ", std::get<2>(it), ")");
4488  int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4489  std::get<0>(it), NULL);
4490  assert(rc == 0);
4491  free(std::get<1>(it));
4492  }
4493 
4494  tmp.clear();
4495  } else {
4496  static_map_mutex.unlock();
4497  }
4498  }
4499  };
4500 
4501 #include "ttg/make_tt.h"
4502 
4503 } // namespace ttg_parsec
4504 
4510 template <>
4512  private:
4513  ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4514  bool do_release = true;
4515 
4516  public:
4517  value_copy_handler() = default;
4520  : copy_to_remove(h.copy_to_remove)
4521  {
4522  h.copy_to_remove = nullptr;
4523  }
4524 
4527  {
4528  std::swap(copy_to_remove, h.copy_to_remove);
4529  return *this;
4530  }
4531 
4533  if (nullptr != copy_to_remove) {
4535  if (do_release) {
4536  ttg_parsec::detail::release_data_copy(copy_to_remove);
4537  }
4538  }
4539  }
4540 
4541  template <typename Value>
4542  inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4543  constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4544  using value_type = std::remove_reference_t<Value>;
4545  static_assert(value_is_rvref ||
4546  std::is_copy_constructible_v<std::decay_t<Value>>,
4547  "Data sent without being moved must be copy-constructible!");
4548 
4550  if (nullptr == caller) {
4551  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4552  }
4553 
4555  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4556  value_type *value_ptr = &value;
4557  if (nullptr == copy) {
4562  copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4563  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4564  assert(inserted);
4565  value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4566  copy_to_remove = copy;
4567  } else {
4568  if constexpr (value_is_rvref) {
4569  /* this copy won't be modified anymore so mark it as read-only */
4570  copy->reset_readers();
4571  }
4572  }
4573  /* We're coming from a writer so mark the data as modified.
4574  * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4576  if constexpr (value_is_rvref)
4577  return std::move(*value_ptr);
4578  else
4579  return *value_ptr;
4580  }
4581 
4582  template<typename Value>
4583  inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4585  if (nullptr == caller) {
4586  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4587  }
4589  copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4590  if (nullptr == copy) {
4591  // no need to create a new copy since it's derived from the copy already
4592  copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4593  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4594  assert(inserted);
4595  copy_to_remove = copy; // we want to remove the copy from the task once done sending
4596  do_release = true; // we don't release the copy since we didn't allocate it
4597  copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4598  copy->add_ref(); // add another reference so that TTG never attempts to free this copy
4599  if (copy->num_readers() == 0) {
4600  /* add at least one reader (the current task) */
4601  copy->increment_readers<false>();
4602  }
4603  }
4604  return vref.value_ref;
4605  }
4606 
4607  template <typename Value>
4608  inline const Value &operator()(const Value &value) {
4610  if (nullptr == caller) {
4611  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4612  }
4614  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4615  const Value *value_ptr = &value;
4616  if (nullptr == copy) {
4622  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4623  assert(inserted);
4624  value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4625  copy_to_remove = copy;
4626  }
4628  return *value_ptr;
4629  }
4630 
4631 };
4632 
4633 #endif // PARSEC_TTG_H_INCLUDED
4634 // clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:277
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:31
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition: tt.h:187
auto get_instance_id() const
Definition: tt.h:259
virtual void make_executable()=0
Marks this executable.
Definition: tt.h:287
bool tracing() const
Definition: tt.h:178
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:218
TTBase(TTBase &&other)
Definition: tt.h:116
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:85
bool is_lazy_pull()
Definition: tt.h:200
const TTBase * ttg_ptr() const
Definition: tt.h:206
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:92
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
Represents a device in a specific execution space.
Definition: device.h:14
void print_incomplete_tasks()
Definition: ttg.h:4095
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:4455
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4063
ttg::World get_world() const override final
Definition: ttg.h:1369
static constexpr int numinvals
Definition: ttg.h:1281
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition: ttg.h:1947
void set_priomap(Priomap &&pm)
Definition: ttg.h:4373
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:4136
std::tuple_element_t< i, input_terminals_type > * in()
Definition: ttg.h:4225
void set_devicemap(Devicemap &&dm)
Definition: ttg.h:4383
void set_keymap(Keymap &&km)
keymap setter
Definition: ttg.h:4362
void add_constraint(std::shared_ptr< Constraint > c)
Definition: ttg.h:4412
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition: ttg.h:2258
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:4217
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition: ttg.h:2964
bool check_constraints(task_t *task)
Definition: ttg.h:2601
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition: ttg.h:3495
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:1272
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:2994
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition: ttg.h:2731
task_t * create_new_task(const Key &key)
Definition: ttg.h:2344
virtual ~TT()
Definition: ttg.h:4070
output_terminalsT output_terminals_type
Definition: ttg.h:1285
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:1270
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:4437
static auto & get(InTuple &&intuple)
Definition: ttg.h:1293
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition: ttg.h:2743
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition: ttg.h:4238
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition: ttg.h:4283
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4043
void set_defer_writer(bool value)
Definition: ttg.h:4341
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition: ttg.h:2754
virtual void release() override
Definition: ttg.h:4099
bool get_defer_writer(bool value)
Definition: ttg.h:4345
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition: ttg.h:2749
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3176
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition: ttg.h:2320
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:3197
static void ht_iter_cb(void *item, void *cb_data)
Definition: ttg.h:4085
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3188
parsec_thread_mempool_t * get_task_mempool(void)
Definition: ttg.h:2002
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:3236
void do_release()
Definition: ttg.h:4101
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2678
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition: ttg.h:3297
const auto & get_output_terminals() const
Definition: ttg.h:1310
static constexpr bool derived_has_device_op()
Definition: ttg.h:1258
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition: ttg.h:4255
auto get_devicemap()
Definition: ttg.h:4407
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:3888
decltype(keymap) const & get_keymap() const
Definition: ttg.h:4358
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:1278
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:3407
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:3591
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition: ttg.h:2371
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition: ttg.h:4050
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition: ttg.h:1934
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:3217
static constexpr const ttg::Runtime runtime
Definition: ttg.h:4128
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition: ttg.h:1957
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition: ttg.h:2010
static resultT get(InTuple &&intuple)
Definition: ttg.h:1289
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:1240
void register_static_op_function(void)
Definition: ttg.h:4461
void add_constraint(Constraint &&c)
Definition: ttg.h:4428
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:3208
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:1286
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition: ttg.h:2277
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition: ttg.h:2332
void set_arg_from_msg(void *data, std::size_t size)
Definition: ttg.h:2071
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition: ttg.h:2338
keyT key_type
Definition: ttg.h:1269
void invoke() override
Definition: ttg.h:4291
void make_executable() override
Marks this executable.
Definition: ttg.h:4350
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
Definition: ttg.h:2642
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:1277
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:1279
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:1275
decltype(priomap) const & get_priomap() const
Definition: ttg.h:4368
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition: ttg.h:4327
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:3355
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition: ttg.h:4267
std::tuple_element_t< i, output_terminalsT > * out()
Definition: ttg.h:4232
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2398
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition: ttg.h:2326
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition: ttg.h:2314
static constexpr bool derived_has_hip_op()
Definition: ttg.h:1246
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
Definition: ttg.h:2618
void copy_mark_pushout(const Value &value)
Definition: ttg.h:3455
void get_from_pull_msg(void *data, std::size_t size)
Definition: ttg.h:2300
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition: ttg.h:2788
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:2738
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:1252
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition: ttg.h:3605
actual_input_tuple_type input_args_type
Definition: ttg.h:1271
void increment_created()
Definition: ttg.h:468
virtual void execute() override
Definition: ttg.h:409
static constexpr int parsec_ttg_rma_tag()
Definition: ttg.h:405
void decrement_inflight_msg()
Definition: ttg.h:471
WorldImpl & operator=(const WorldImpl &other)=delete
void destroy_tpool()
Definition: ttg.h:420
const ttg::Edge & ctl_edge() const
Definition: ttg.h:466
void increment_inflight_msg()
Definition: ttg.h:470
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
Definition: ttg.h:500
void create_tpool()
Definition: ttg.h:347
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition: ttg.h:273
ttg::Edge & ctl_edge()
Definition: ttg.h:464
MPI_Comm comm() const
Definition: ttg.h:407
bool mpi_support(ttg::ExecutionSpace space)
Definition: ttg.h:514
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
Definition: ttg.h:529
virtual bool profiling() override
Definition: ttg.h:512
virtual void dag_off() override
Definition: ttg.h:491
virtual void fence_impl(void) override
Definition: ttg.h:568
virtual void dag_on(const std::string &filename) override
Definition: ttg.h:475
static constexpr int parsec_ttg_tag()
Definition: ttg.h:404
virtual void final_task() override
Definition: ttg.h:518
auto * taskpool()
Definition: ttg.h:345
virtual void destroy() override
Definition: ttg.h:433
virtual void profile_on() override
Definition: ttg.h:506
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition: ttg.h:473
auto * execution_stream()
Definition: ttg.h:344
auto * context()
Definition: ttg.h:343
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
Definition: ttg.h:862
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition: env.cpp:33
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:199
void set_current(int device, Stream stream)
Definition: device.h:128
void reset_current()
Definition: device.h:123
constexpr bool probe_all_v
Definition: meta.h:191
constexpr bool is_const_v
Definition: meta.h:325
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:645
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition: ptr.h:277
bool & initialized_mpi()
Definition: ttg.h:226
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
Definition: ttg.h:844
bool all_devices_peer_access
Definition: ttg.h:231
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:812
int first_device_id
Definition: device.h:12
std::size_t max_inline_size
Definition: ttg.h:192
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:660
void foreach_parsec_data(Value &&value, Fn &&fn)
Definition: parsec_data.h:9
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
const parsec_symbol_t parsec_taskclass_param1
Definition: ttg.h:620
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
Definition: ttg.h:824
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:674
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition: ttg.h:174
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:688
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:781
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
Definition: ttg.h:959
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
Definition: ttg.h:834
void transfer_ownership_impl(T &&arg, int device)
Definition: ttg.h:760
const parsec_symbol_t parsec_taskclass_param2
Definition: ttg.h:628
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:801
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition: ttg.h:716
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:790
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
Definition: thread_local.h:12
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
Definition: ttg.h:771
void release_data_copy(ttg_data_copy_t *copy)
Definition: ttg.h:906
const parsec_symbol_t parsec_taskclass_param3
Definition: ttg.h:636
const parsec_symbol_t parsec_taskclass_param0
Definition: ttg.h:612
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
void ttg_fence(ttg::World world)
Definition: ttg.h:1137
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
Definition: ttg.h:138
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition: ttg.h:136
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition: ttg.h:139
void ttg_finalize()
Definition: ttg.h:1123
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:1140
std::mutex static_map_mutex
Definition: ttg.h:137
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:1154
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:1158
void make_executable_hook(ttg::World &)
Definition: ttg.h:1166
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition: ttg.h:134
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:1149
ttg::World ttg_default_execution_context()
Definition: ttg.h:1133
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition: ttg.h:135
void ttg_execute(ttg::World world)
Definition: ttg.h:1136
void ttg_sum(ttg::World world, double &value)
Definition: ttg.h:1160
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
ExecutionSpace
denotes task execution space
Definition: execution.h:17
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
Runtime
Definition: runtimes.h:15
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
bool tracing()
returns whether tracing is enabled
Definition: trace.h:28
int rank(World world=default_execution_context())
Definition: run.h:85
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Definition: parsec-ext.h:8
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition: ttg.h:4608
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition: ttg.h:4542
value_copy_handler & operator=(value_copy_handler &&h)
Definition: ttg.h:4526
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition: ttg.h:4583
value_copy_handler & operator=(const value_copy_handler &h)=delete
A container for types.
Definition: typelist.h:24
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
parsec_hash_table_t task_constraint_table
Definition: ttg.h:1197
parsec_hash_table_t tasks_table
Definition: ttg.h:1196
parsec_gpu_task_t * gpu_task
Definition: task.h:14
msg_header_t tt_id
Definition: ttg.h:177
static constexpr std::size_t max_payload_size
Definition: ttg.h:178
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition: ttg.h:182
unsigned char bytes[max_payload_size]
Definition: ttg.h:179
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
std::array< stream_info_t, num_streams > streams
Definition: task.h:210
ttg_data_copy_t * copies[num_copies]
Definition: task.h:216
lvalue_reference_type value_ref
Definition: ttvalue.h:88
static void drop_all_ptr()
Definition: ptr.h:117
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:359
static constexpr int mutable_tag
Definition: ttg_data_copy.h:61
parsec_task_t * get_next_task() const
void set_next_task(parsec_task_t *task)
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition: ttg.h:148
std::int8_t num_iovecs
Definition: ttg.h:152
std::size_t key_offset
Definition: ttg.h:150
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition: ttg.h:160
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13