ttg.h
Go to the documentation of this file.
1 #ifndef MADNESS_TTG_H_INCLUDED
2 #define MADNESS_TTG_H_INCLUDED
3 
4 /* set up env if this header was included directly */
5 #if !defined(TTG_IMPL_NAME)
6 #define TTG_USE_MADNESS 1
7 #endif // !defined(TTG_IMPL_NAME)
8 
9 #include "ttg/impl_selector.h"
10 
11 /* include ttg header to make symbols available in case this header is included directly */
12 #include "../../ttg.h"
13 #include "ttg/base/keymap.h"
14 #include "ttg/base/tt.h"
15 #include "ttg/func.h"
16 #include "ttg/madness/device.h"
17 #include "ttg/runtimes.h"
18 #include "ttg/tt.h"
19 #include "ttg/util/bug.h"
20 #include "ttg/util/env.h"
21 #include "ttg/util/hash.h"
22 #include "ttg/util/macro.h"
23 #include "ttg/util/meta.h"
24 #include "ttg/util/meta/callable.h"
25 #include "ttg/util/scope_exit.h"
26 #include "ttg/util/void.h"
27 #include "ttg/world.h"
28 #include "ttg/coroutine.h"
29 
30 #include <array>
31 #include <cassert>
32 #include <functional>
33 #include <future>
34 #include <iostream>
35 #include <map>
36 #include <memory>
37 #include <string>
38 #include <tuple>
39 #include <vector>
40 
41 #include <madness/world/MADworld.h>
42 #include <madness/world/world_object.h>
43 #include <madness/world/worldhashmap.h>
44 #include <madness/world/worldtypes.h>
45 
46 #include <madness/world/world_task_queue.h>
47 
48 namespace ttg_madness {
49 
50 #if 0
51  class Control;
52  class Graph;
54  class Graph {
55  public:
56  Graph() {
57  world_ = default_execution_context();
58  }
59  Graph(World& w) : world_(w) {}
60 
61 
62  private:
63  World& world_;
64  };
65 #endif
66 
67  class WorldImpl final : public ttg::base::WorldImplBase {
68  private:
69  ::madness::World &m_impl;
70  bool m_allocated = false;
71 
72  ttg::Edge<> m_ctl_edge;
73 
74  public:
75  WorldImpl(::madness::World &world) : WorldImplBase(world.size(), world.rank()), m_impl(world) {}
76 
77  WorldImpl(const SafeMPI::Intracomm &comm)
78  : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
79 
80  /* Deleted copy ctor */
81  WorldImpl(const WorldImpl &other) = delete;
82 
83  /* Deleted move ctor */
84  WorldImpl(WorldImpl &&other) = delete;
85 
86  virtual ~WorldImpl() override { destroy(); }
87 
88  /* Deleted copy assignment */
89  WorldImpl &operator=(const WorldImpl &other) = delete;
90 
91  /* Deleted move assignment */
92  WorldImpl &operator=(WorldImpl &&other) = delete;
93 
94  virtual void fence_impl(void) override { m_impl.gop.fence(); }
95 
96  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
97 
98  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
99 
100  virtual void destroy(void) override {
101  if (is_valid()) {
102  release_ops();
104  if (m_allocated) {
105  delete &m_impl;
106  m_allocated = false;
107  }
108  mark_invalid();
109  }
110  }
111 
112  /* Return an unmanaged reference to the implementation object */
113  ::madness::World &impl() { return m_impl; }
114 
115  const ::madness::World &impl() const { return m_impl; }
116 
117 #ifdef ENABLE_PARSEC
118  parsec_context_t *context() { return ::madness::ThreadPool::instance()->parsec; }
119 #endif
120  };
121 
122  inline void make_executable_hook(ttg::World& world) { }
123 
124  inline void ttg_initialize(int argc, char **argv, int num_threads) {
126  ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
127  auto *world_ptr = new ttg_madness::WorldImpl{madworld};
128  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
129  ttg::World world{std::move(world_sptr)};
130  ttg::detail::set_default_world(std::move(world));
131  }
132  inline void ttg_finalize() {
133  ttg::detail::set_default_world(ttg::World{}); // reset the default world
134  ttg::detail::destroy_worlds<ttg_madness::WorldImpl>();
136  }
138  inline void ttg_abort() {
139  MPI_Abort(ttg_default_execution_context().impl().impl().mpi.Get_mpi_comm(), 1);
140  assert(0); // make sure we abort
141  }
142  inline void ttg_execute(ttg::World world) {
143  // World executes tasks eagerly
144  }
145  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
146 
147  template <typename T>
148  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
149  world.impl().register_ptr(ptr);
150  }
151 
152  template <typename T>
153  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
154  world.impl().register_ptr(std::move(ptr));
155  }
156 
157  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
158  world.impl().register_status(status_ptr);
159  }
160 
161  template <typename Callback>
162  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
163  world.impl().register_callback(std::forward<Callback>(callback));
164  }
165 
166  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
167 
168  template <typename T>
169  inline void ttg_sum(ttg::World world, T &value) {
170  world.impl().impl().gop.sum(value);
171  }
174  template <typename T>
175  inline void ttg_broadcast(ttg::World world, T &data, int source_rank) {
176  world.impl().impl().gop.broadcast_serializable(data, source_rank);
177  }
178 
187  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs>
188  class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs>> {
189  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
190  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
191  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
192  // create a virtual control input if the input list is empty, to be used in invoke()
193  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
195 
196  public:
197  using ttT = TT;
198  using key_type = keyT;
200  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
201 
202  private:
203  ttg::World world;
204  ttg::meta::detail::keymap_t<keyT> keymap;
205  ttg::meta::detail::keymap_t<keyT> priomap;
206  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
207  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
208  input_reducers;
209  int num_pullins = 0;
210 
211  std::array<std::size_t, std::tuple_size_v<actual_input_tuple_type>> static_streamsize;
212 
213  public:
214  ttg::World get_world() const override final { return world; }
215 
217  static constexpr bool derived_has_cuda_op() {
218  return false;
219  }
220 
222  static constexpr bool derived_has_hip_op() {
223  return false;
224  }
225 
227  static constexpr bool derived_has_level_zero_op() {
228  return false;
229  }
230 
232  static constexpr bool derived_has_device_op() {
233  return false;
234  }
235 
236  protected:
237  using worldobjT = ::madness::WorldObject<ttT>;
238 
239  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
240  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
241  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
242 
243  // This to support tt fusion
244  inline static __thread struct {
245  uint64_t key_hash = 0; // hash of current key
246  size_t call_depth = 0; // how deep calls are nested
248 
249  public:
252  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
253  static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
254  "at most one void input can be handled, and it must come last");
255  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
257  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
259  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
260 
261  using input_args_type = actual_input_tuple_type;
262 
263  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
264  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
265  static_assert(!ttg::meta::is_any_void_v<input_values_tuple_type>);
266 
267  using output_terminals_type = output_terminalsT;
269 
270  protected:
271  template <std::size_t i, typename resultT, typename InTuple>
272  static resultT get(InTuple &&intuple) {
273  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
274  };
275  template <std::size_t i, typename InTuple>
276  static auto &get(InTuple &&intuple) {
277  return std::get<i>(std::forward<InTuple>(intuple));
278  };
279 
280  private:
281  input_terminals_type input_terminals;
282  output_terminalsT output_terminals;
283 
284  protected:
285  const auto &get_output_terminals() const { return output_terminals; }
286 
287  private:
288  struct TTArgs : ::madness::TaskInterface {
289  private:
290  using TaskInterface = ::madness::TaskInterface;
291 
292  public:
293  int counter; // Tracks the number of arguments finalized
294  std::array<std::int64_t, numins>
295  nargs; // Tracks the number of expected values minus the number of received values
296  // 0 = finalized
297  // for a streaming input initialized to std::numeric_limits<std::int64_t>::max()
298  // which indicates that the value needs to be initialized
299  std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
300  // inputs (0 = unbounded stream, >0 = bounded stream)
301  input_values_tuple_type input_values; // The input values (does not include control)
302  derivedT *derived; // Pointer to derived class instance
303  bool pull_terminals_invoked = false;
304  std::conditional_t<ttg::meta::is_void_v<keyT>, ttg::Void, keyT> key; // Task key
305 #ifdef TTG_HAVE_COROUTINE
306  void *suspended_task_address = nullptr; // if not null the function is suspended
308 #endif // TTG_HAVE_COROUTINE
309 
311  template <typename Tuple, std::size_t... Is>
312  static input_refs_tuple_type make_input_refs_impl(Tuple &&inputs, std::index_sequence<Is...>) {
313  return input_refs_tuple_type{
314  get<Is, std::tuple_element_t<Is, input_refs_tuple_type>>(std::forward<Tuple>(inputs))...};
315  }
316 
318  input_refs_tuple_type make_input_refs() {
319  return make_input_refs_impl(this->input_values,
320  std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{});
321  }
322 
323  TTArgs(int prio = 0)
324  : TaskInterface(TaskAttributes(prio ? TaskAttributes::HIGHPRIORITY : 0))
325  , counter(numins)
326  , nargs()
327  , stream_size()
328  , input_values() {
329  std::fill(nargs.begin(), nargs.end(), std::numeric_limits<std::int64_t>::max());
330  }
331 
332  virtual void run(::madness::World &world) override {
333  using ttg::hash;
334  ttT::threaddata.key_hash = hash<decltype(key)>{}(key);
335  ttT::threaddata.call_depth++;
336 
337  void *suspended_task_address =
338 #ifdef TTG_HAVE_COROUTINE
339  this->suspended_task_address; // non-null = need to resume the task
340 #else // TTG_HAVE_COROUTINE
341  nullptr;
342 #endif // TTG_HAVE_COROUTINE
343  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
344  // ttg::print("starting task");
345  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
347  suspended_task_address,
348  coroutine_id,
349  derived->op(key, this->make_input_refs(),
350  derived->output_terminals)); // !!! NOTE converting input values to refs
351  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
352  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
353  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
355  suspended_task_address,
356  coroutine_id,
357  derived->op(this->make_input_refs(),
358  derived->output_terminals)); // !!! NOTE converting input values to refs
359  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
360  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
361  } else // unreachable
362  ttg::abort();
363  } else { // resume suspended coroutine
364 #ifdef TTG_HAVE_COROUTINE
365  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
366  assert(ret.ready());
367  ret.resume();
368  if (ret.completed()) {
369  ret.destroy();
370  suspended_task_address = nullptr;
371  } else { // not yet completed
372  // leave suspended_task_address as is
373  }
374  this->suspended_task_address = suspended_task_address;
375 #else // TTG_HAVE_COROUTINE
376  ttg::abort(); // should not happen
377 #endif // TTG_HAVE_COROUTINE
378  }
379 
380  ttT::threaddata.call_depth--;
381 
382  // if (suspended_task_address == nullptr) {
383  // ttg::print("finishing task",ttT::threaddata.call_depth);
384  // }
385 
386 #ifdef TTG_HAVE_COROUTINE
387  if (suspended_task_address) {
388  // TODO implement handling of suspended coroutines properly
389 
390  // only resumable_task is recognized at the moment
391  assert(coroutine_id == ttg::TaskCoroutineID::ResumableTask);
392 
393  // right now can events are not properly implemented, we are only testing the workflow with dummy events
394  // so mark the events finished manually
395  // proper thing to do is to process event queue and resubmit this task again
396  auto events =
397  static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
398  for (auto &event_ptr : events) {
399  event_ptr->finish();
400  }
401  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
402 
403  // resume the coroutine
404  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
405  assert(ret.ready());
406  ret.resume();
407  if (ret.completed()) {
408  ret.destroy();
409  suspended_task_address = nullptr;
410  } else { // not yet completed
411  ttg::abort();
412  }
413  }
414 #endif // TTG_HAVE_COROUTINE
415  }
416 
417  virtual ~TTArgs() {} // Will be deleted via TaskInterface*
418 
419  private:
420  ::madness::Spinlock lock_; // synchronizes access to data
421  public:
422  void lock() { lock_.lock(); }
423  void unlock() { lock_.unlock(); }
424  };
425 
426  using hashable_keyT = std::conditional_t<ttg::meta::is_void_v<keyT>, int, keyT>;
427  using cacheT = ::madness::ConcurrentHashMap<hashable_keyT, TTArgs *, ttg::hash<hashable_keyT>>;
428  using accessorT = typename cacheT::accessor;
429  cacheT cache;
430 
431  protected:
432  template <typename terminalT, std::size_t i, typename Key>
433  void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args) {
434  if (in.is_pull_terminal) {
435  int owner;
436  if constexpr (!ttg::meta::is_void_v<Key>) {
437  owner = in.container.owner(key);
438  } else {
439  owner = in.container.owner();
440  }
441 
442  if (owner != world.rank()) {
443  get_terminal_data<i, Key>(owner, key);
444  } else {
445  if constexpr (!ttg::meta::is_void_v<Key>) {
446  auto value = (in.container).get(key);
447  if (args->nargs[i] == 0) {
448  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
449  ": error argument is already finalized : ", i);
450  throw std::runtime_error("Op::set_arg called for a finalized stream");
451  }
452 
453  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
454  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
455  args->nargs[i] = 0;
456  args->counter--;
457  }
458  } else {
459  auto value = (in.container).get();
460  if (args->nargs[i] == 0) {
461  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
462  ": error argument is already finalized : ", i);
463  throw std::runtime_error("Op::set_arg called for a finalized stream");
464  }
465 
466  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
467  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
468  args->nargs[i] = 0;
469  args->counter--;
470  }
471  }
472  }
473  }
474  }
475 
476  template <std::size_t i, typename Key>
477  void get_terminal_data(const int owner, const Key &key) {
478  if (owner != world.rank()) {
479  worldobjT::send(owner, &ttT::template get_terminal_data<i, Key>, owner, key);
480  } else {
481  auto &in = std::get<i>(input_terminals);
482  if constexpr (!ttg::meta::is_void_v<Key>) {
483  auto value = (in.container).get(key);
484  worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
485  key, value);
486  } else {
487  auto value = (in.container).get();
488  worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
489  value);
490  }
491  }
492  }
493 
494  template <std::size_t... IS, typename Key = keyT>
495  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, TTArgs *args) {
496  int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
497  std::get<IS>(input_terminals), key, args),
498  0)...};
499  junk[0]++;
500  }
501 
502  // there are 6 types of set_arg:
503  // - case 1: nonvoid Key, complete Value type
504  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
505  // - case 3: nonvoid Key, void Value, no inputs
506  // - case 4: void Key, complete Value type
507  // - case 5: void Key, void Value, mixed (data+control) inputs
508  // - case 6: void Key, void Value, no inputs
509  // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
510 
511  // case 1:
512  template <std::size_t i, typename Key, typename Value>
513  void set_arg(const Key &key, Value &&value) {
514  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>; // Should be T or const T
515  static_assert(std::is_same_v<std::decay_t<Value>, std::decay_t<valueT>>,
516  "TT::set_arg(key,value) given value of type incompatible with TT");
517 
518  int owner;
519  if constexpr (!ttg::meta::is_void_v<Key>) {
520  owner = keymap(key);
521  } else {
522  owner = keymap();
523  }
524 
525  if (owner != world.rank()) {
526  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding setting argument : ", i);
527  // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
528  // BUT compiler vomits when const std::remove_reference_t<Value>& -> std::decay_t<Value>
529  // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
530  // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
531  // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
532  // move arguments) and locally
533  // here we know that this will be a remove execution, so we prepare to take rvalues;
534  // send_am will need to separate local and remote paths to deal with this
535  if constexpr (!ttg::meta::is_void_v<Key>) {
536  if constexpr (!ttg::meta::is_void_v<Value>) {
537  worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
538  } else {
539  worldobjT::send(owner, &ttT::template set_arg<i, Key, void>, key);
540  }
541  } else {
542  if constexpr (!ttg::meta::is_void_v<Value>) {
543  worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
544  } else {
545  worldobjT::send(owner, &ttT::template set_arg<i, void, void>);
546  }
547  }
548  } else {
549  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
550 
551  bool pullT_invoked = false;
552  accessorT acc;
553 
554  int prio;
555  if constexpr (!ttg::meta::is_void_v<Key>) {
556  prio = this->priomap(key);
557  if (cache.insert(acc, key)) {
558  acc->second = new TTArgs(prio); // It will be deleted by the task q
559  if (!is_lazy_pull()) {
560  // Invoke pull terminals for only the terminals with non-void values.
561  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
562  acc->second);
563  pullT_invoked = true;
564  }
565  }
566  } else {
567  prio = this->priomap();
568  if (cache.insert(acc, 0)) acc->second = new TTArgs(prio); // It will be deleted by the task q
569  }
570 
571  TTArgs *args = acc->second;
572  if (!is_lazy_pull() && pullT_invoked) args->pull_terminals_invoked = true;
573 
574  if (args->nargs[i] == 0) {
575  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
576  throw std::runtime_error("TT::set_arg called for a finalized stream");
577  }
578 
579  const auto &reducer = std::get<i>(input_reducers);
580  if (reducer) { // is this a streaming input? reduce the received value
581  // N.B. Right now reductions are done eagerly, without spawning tasks
582  // this means we must lock
583  args->lock();
584 
585  bool initialize_not_reduce = false;
586  if (args->nargs[i] == std::numeric_limits<std::int64_t>::max()) {
587  // upon first datum initialize, if needed
588  if constexpr (!ttg::meta::is_void_v<valueT>) {
589  initialize_not_reduce = true;
590  }
591 
592  // initialize nargs
593  // if we have a stream size for the op, use it first
594  if (args->stream_size[i] != 0) {
595  assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
596  args->nargs[i] = args->stream_size[i];
597  } else if (static_streamsize[i] != 0) {
598  assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
599  args->stream_size[i] = static_streamsize[i];
600  args->nargs[i] = static_streamsize[i];
601  } else {
602  args->nargs[i] = 0;
603  }
604  }
605 
606  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
607  if (initialize_not_reduce)
608  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
609  else
610  reducer(this->get<i, std::decay_t<valueT> &>(args->input_values), value);
611  } else {
612  reducer(); // even if this was a control input, must execute the reducer for possible side effects
613  }
614 
615  // update the counter
616  args->nargs[i]--;
617 
618  // is this the last message?
619  if (args->nargs[i] == 0) args->counter--;
620 
621  args->unlock();
622  } else { // this is a nonstreaming input => set the value
623  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
624  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
625  }
626  args->nargs[i] = 0;
627  args->counter--;
628  }
629 
630  // If lazy pulling in enabled, check it here.
631  if (numins - args->counter == num_pullins) {
632  if (is_lazy_pull() && !args->pull_terminals_invoked) {
633  // Invoke pull terminals for only the terminals with non-void values.
634  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
635  }
636  }
637 
638  // ready to run the task?
639  if (args->counter == 0) {
640  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
641  args->derived = static_cast<derivedT *>(this);
642  args->key = key;
643 
644  using ttg::hash;
645  auto curhash = hash<keyT>{}(key);
646 
647  if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
648 
649  // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
650  ttT::threaddata.call_depth++;
651  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
652  static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
653  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
654  static_cast<derivedT *>(this)->op(key, output_terminals); // Runs immediately
655  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
656  static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
657  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
658  static_cast<derivedT *>(this)->op(output_terminals); // Runs immediately
659  } else
660  ttg::abort();
661  ttT::threaddata.call_depth--;
662 
663  } else {
664  // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
665  world.impl().impl().taskq.add(args);
666  }
667 
668  cache.erase(acc);
669  }
670  }
671  }
672 
673  // case 2 and 3
674  template <std::size_t i, typename Key, typename Value>
675  std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
676  set_arg<i>(key, ttg::Void{});
677  }
678 
679  // case 4
680  template <std::size_t i, typename Key = keyT, typename Value>
681  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
682  return set_arg<i>(ttg::Void{}, std::forward<Value>(value));
683  }
684 
685  // case 5 and 6
686  template <std::size_t i, typename Key = keyT, typename Value>
687  std::enable_if_t<ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg() {
688  set_arg<i, ttg::Void, ttg::Void>(ttg::Void{}, ttg::Void{});
689  }
690 
691  // Used by invoke to set all arguments associated with a task
692  // Is: index sequence of elements in args
693  // Js: index sequence of input terminals to set
694  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
695  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
696  const Key &key, const std::tuple<Ts...> &args) {
697  static_assert(sizeof...(Js) == sizeof...(Is));
698  constexpr std::size_t js[] = {Js...};
699  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
700  junk[0]++;
701  }
702 
703  // Used by invoke to set all arguments associated with a task
704  // Is: index sequence of input terminals to set
705  template <typename Key, typename... Ts, size_t... Is>
706  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
707  const std::tuple<Ts...> &args) {
708  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
709  }
710 
711  // Used by invoke to set all arguments associated with a task
712  // Is: index sequence of elements in args
713  // Js: index sequence of input terminals to set
714  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
715  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
716  const std::tuple<Ts...> &args) {
717  static_assert(sizeof...(Js) == sizeof...(Is));
718  constexpr std::size_t js[] = {Js...};
719  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
720  junk[0]++;
721  }
722 
723  // Used by invoke to set all arguments associated with a task
724  // Is: index sequence of input terminals to set
725  template <typename Key = keyT, typename... Ts, size_t... Is>
726  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
727  const std::tuple<Ts...> &args) {
728  set_args(std::index_sequence_for<Ts...>{}, is, args);
729  }
730 
731  public:
734  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
735  std::enable_if_t<key_is_void, void> set_argstream_size(std::size_t size) {
736  // preconditions
737  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
738  assert(size > 0 && "TT::set_argstream_size(size) called with size=0");
739 
740  // body
741  const auto owner = keymap();
742  if (owner != world.rank()) {
743  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
744  worldobjT::send(owner, &ttT::template set_argstream_size<i, true>, size);
745  } else {
746  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
747 
748  accessorT acc;
749  if (cache.insert(acc, 0)) acc->second = new TTArgs(); // It will be deleted by the task q
750  TTArgs *args = acc->second;
751 
752  args->lock();
753 
754  // check if stream is already bounded
755  if (args->stream_size[i] > 0) {
756  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
757  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
758  }
759 
760  // check if stream is already finalized
761  if (args->nargs[i] == 0) {
762  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
763  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
764  }
765 
766  // commit changes
767  args->stream_size[i] = size;
768  // if messages already received for this key update the expected-received counter
769  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
770  if (messages_received_already) {
771  // cannot have received more messages than expected
772  if (-(args->nargs[i]) > size) {
773  ttg::print_error(world.rank(), ":", get_name(),
774  " : error stream received more messages than specified via set_argstream_size : ", i);
775  throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
776  }
777  args->nargs[i] += size;
778  }
779  // if done, update the counter
780  if (args->nargs[i] == 0) args->counter--;
781  args->unlock();
782 
783  // ready to run the task?
784  if (args->counter == 0) {
785  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
786  args->derived = static_cast<derivedT *>(this);
787 
788  world.impl().impl().taskq.add(args);
789 
790  cache.erase(acc);
791  }
792  }
793  }
794 
795  template <std::size_t i>
796  void set_static_argstream_size(std::size_t size) {
797  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
798  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
799 
800  ttg::trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
801 
802  // Check if stream is already bounded
803  if (static_streamsize[i] > 0) {
804  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
805  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
806  }
807 
808  // commit changes
809  static_streamsize[i] = size;
810  }
811 
816  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
817  std::enable_if_t<!key_is_void, void> set_argstream_size(const Key &key, std::size_t size) {
818  // preconditions
819  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
820  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
821 
822  // body
823  const auto owner = keymap(key);
824  if (owner != world.rank()) {
825  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
826  worldobjT::send(owner, &ttT::template set_argstream_size<i>, key, size);
827  } else {
828  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
829 
830  accessorT acc;
831  if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
832  TTArgs *args = acc->second;
833 
834  args->lock();
835 
836  // check if stream is already bounded
837  if (args->stream_size[i] > 0) {
838  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
839  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
840  }
841 
842  // check if stream is already finalized
843  if (args->nargs[i] == 0) {
844  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
845  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
846  }
847 
848  // commit changes
849  args->stream_size[i] = size;
850  // if messages already received for this key update the expected-received counter
851  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
852  if (messages_received_already) args->nargs[i] += size;
853  // if done, update the counter
854  if (args->nargs[i] == 0) args->counter--;
855 
856  args->unlock();
857 
858  // ready to run the task?
859  if (args->counter == 0) {
860  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
861  args->derived = static_cast<derivedT *>(this);
862  args->key = key;
863 
864  world.impl().impl().taskq.add(args);
865 
866  cache.erase(acc);
867  }
868  }
869  }
870 
872  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
873  std::enable_if_t<!key_is_void, void> finalize_argstream(const Key &key) {
874  // preconditions
875  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
876 
877  // body
878  const auto owner = keymap(key);
879  if (owner != world.rank()) {
880  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
881  worldobjT::send(owner, &ttT::template finalize_argstream<i>, key);
882  } else {
883  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
884 
885  accessorT acc;
886  const auto found = cache.find(acc, key);
887  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
888  TTGUNUSED(found);
889  TTArgs *args = acc->second;
890 
891  // check if stream is already bounded
892  if (args->stream_size[i] > 0) {
893  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
894  throw std::runtime_error("TT::finalize called for a bounded stream");
895  }
896 
897  // check if stream is already finalized
898  if (args->nargs[i] == 0) {
899  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
900  throw std::runtime_error("TT::finalize called for a finalized stream");
901  }
902 
903  // commit changes
904  args->nargs[i] = 0;
905  args->counter--;
906  // ready to run the task?
907  if (args->counter == 0) {
908  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
909  args->derived = static_cast<derivedT *>(this);
910  args->key = key;
911 
912  world.impl().impl().taskq.add(args);
913  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
914 
915  cache.erase(acc);
916  }
917  }
918  }
919 
921  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
922  std::enable_if_t<key_is_void, void> finalize_argstream() {
923  // preconditions
924  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
925 
926  // body
927  const int owner = keymap();
928  if (owner != world.rank()) {
929  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream finalize for terminal ", i);
930  worldobjT::send(owner, &ttT::template finalize_argstream<i, true>);
931  } else {
932  ttg::trace(world.rank(), ":", get_name(), " : finalizing stream for terminal ", i);
933 
934  accessorT acc;
935  const auto found = cache.find(acc, 0);
936  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
937  TTGUNUSED(found);
938  TTArgs *args = acc->second;
939 
940  // check if stream is already bounded
941  if (args->stream_size[i] > 0) {
942  ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
943  throw std::runtime_error("TT::finalize called for a bounded stream");
944  }
945 
946  // check if stream is already finalized
947  if (args->nargs[i] == 0) {
948  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
949  throw std::runtime_error("TT::finalize called for a finalized stream");
950  }
951 
952  // commit changes
953  args->nargs[i] = 0;
954  args->counter--;
955  // ready to run the task?
956  if (args->counter == 0) {
957  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
958  args->derived = static_cast<derivedT *>(this);
959 
960  world.impl().impl().taskq.add(args);
961  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
962 
963  cache.erase(acc);
964  }
965  }
966  }
967 
968  private:
969  // Copy/assign/move forbidden ... we could make it work using
970  // PIMPL for this base class. However, this instance of the base
971  // class is tied to a specific instance of a derived class a
972  // pointer to which is captured for invoking derived class
973  // functions. Thus, not only does the derived class have to be
974  // involved but we would have to do it in a thread safe way
975  // including for possibly already running tasks and remote
976  // references. This is not worth the effort ... wherever you are
977  // wanting to move/assign an TT you should be using a pointer.
978  TT(const TT &other) = delete;
979  TT &operator=(const TT &other) = delete;
980  TT(TT &&other) = delete;
981  TT &operator=(TT &&other) = delete;
982 
983  // Registers the callback for the i'th input terminal
984  template <typename terminalT, std::size_t i>
985  void register_input_callback(terminalT &input) {
986  static_assert(std::is_same_v<keyT, typename terminalT::key_type>,
987  "TT::register_input_callback(terminalT) -- incompatible terminalT");
988  using valueT = std::decay_t<typename terminalT::value_type>;
989 
990  if (input.is_pull_terminal) {
991  num_pullins++;
992  }
993 
995  // case 1: nonvoid key, nonvoid value
997  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
998  !std::is_void_v<valueT>) {
999  auto move_callback = [this](const keyT &key, valueT &&value) {
1000  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
1001  };
1002  auto send_callback = [this](const keyT &key, const valueT &value) {
1003  set_arg<i, keyT, const valueT &>(key, value);
1004  };
1005  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1006  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1007  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1008  }
1010  // case 4: void key, nonvoid value
1012  else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1013  !std::is_void_v<valueT>) {
1014  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1015  auto send_callback = [this](const valueT &value) { set_arg<i, keyT, const valueT &>(value); };
1016  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1017  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1018  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1019  }
1021  // case 2: nonvoid key, void value, mixed inputs
1022  // case 3: nonvoid key, void value, no inputs
1024  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
1025  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, void>(key); };
1026  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1027  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1028  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1029  }
1031  // case 5: void key, void value, mixed inputs
1032  // case 6: void key, void value, no inputs
1034  else if constexpr (ttg::meta::is_all_void_v<keyT, valueT> && std::is_void_v<valueT>) {
1035  auto send_callback = [this]() { set_arg<i, keyT, void>(); };
1036  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1037  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1038  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1039  } else
1040  ttg::abort();
1041  }
1042 
1043  template <std::size_t... IS>
1044  void register_input_callbacks(std::index_sequence<IS...>) {
1045  int junk[] = {
1046  0,
1047  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1048  0)...};
1049  junk[0]++;
1050  }
1051 
1052  template <std::size_t... IS, typename inedgesT>
1053  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1054  static_assert(sizeof...(IS) == std::tuple_size_v<input_terminals_type>);
1055  static_assert(std::tuple_size_v<inedgesT> == std::tuple_size_v<input_terminals_type>);
1056  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
1057  junk[0]++;
1058  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1059  " Edges");
1060  }
1061 
1062  template <std::size_t... IS, typename outedgesT>
1063  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1064  static_assert(sizeof...(IS) == numouts);
1065  static_assert(std::tuple_size_v<outedgesT> == numouts);
1066  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
1067  junk[0]++;
1068  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1069  " Edges");
1070  }
1071 
1072  public:
1073  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1074  typename priomapT = ttg::detail::default_priomap<keyT>>
1075  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1076  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1077  : ttg::TTBase(name, numinedges, numouts)
1078  , static_streamsize()
1079  , worldobjT(world.impl().impl())
1080  , world(world)
1081  // if using default keymap, rebind to the given world
1082  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1083  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1084  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1085  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1086  // Cannot call these in base constructor since terminals not yet constructed
1087  if (innames.size() != numinedges) {
1088  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1089  numinedges);
1090  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1091  }
1092  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1093 
1094  register_input_terminals(input_terminals, innames);
1095  register_output_terminals(output_terminals, outnames);
1096 
1097  register_input_callbacks(std::make_index_sequence<numinedges>{});
1098  }
1099 
1100  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1101  typename priomapT = ttg::detail::default_priomap<keyT>>
1102  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1103  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1104  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
1105  std::forward<priomapT>(priomap)) {}
1106 
1107  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1108  typename priomapT = ttg::detail::default_priomap<keyT>>
1109  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1110  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1111  keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1112  : ttg::TTBase(name, numinedges, numouts)
1113  , static_streamsize()
1114  , worldobjT(ttg::default_execution_context().impl().impl())
1115  , world(ttg::default_execution_context())
1116  // if using default keymap, rebind to the given world
1117  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1118  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1119  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1120  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1121  // Cannot call in base constructor since terminals not yet constructed
1122  if (innames.size() != numinedges) {
1123  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1124  numinedges);
1125  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1126  }
1127  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1128 
1129  register_input_terminals(input_terminals, innames);
1130  register_output_terminals(output_terminals, outnames);
1131 
1132  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
1133  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
1134  // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1135  register_input_callbacks(std::make_index_sequence<numinedges>{});
1136  }
1137 
1138  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1139  typename priomapT = ttg::detail::default_priomap<keyT>>
1140  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1141  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1142  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1143  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
1144  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
1145 
1146  // Destructor checks for unexecuted tasks
1147  virtual ~TT() {
1148  if (cache.size() != 0) {
1149  std::cerr << world.rank() << ":"
1150  << "warning: unprocessed tasks in destructor of operation '" << get_name()
1151  << "' (class name = " << get_class_name() << ")" << std::endl;
1152  std::cerr << world.rank() << ":"
1153  << " T => argument assigned F => argument unassigned" << std::endl;
1154  int nprint = 0;
1155  for (auto item : cache) {
1156  if (nprint++ > 10) {
1157  std::cerr << " etc." << std::endl;
1158  break;
1159  }
1160  using ::madness::operators::operator<<;
1161  std::cerr << world.rank() << ":"
1162  << " unused: " << item.first << " : ( ";
1163  for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1164  std::cerr << ")" << std::endl;
1165  }
1166  ttg::abort();
1167  }
1168  }
1169 
1175  template <std::size_t i, typename Reducer>
1176  void set_input_reducer(Reducer &&reducer) {
1177  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
1178  std::get<i>(input_reducers) = reducer;
1179  }
1180 
1188  template <std::size_t i, typename Reducer>
1189  void set_input_reducer(Reducer &&reducer, std::size_t size) {
1190  set_input_reducer<i>(std::forward<Reducer>(reducer));
1191  set_static_argstream_size<i>(size);
1192  }
1193 
1194  template <typename Keymap>
1195  void set_keymap(Keymap &&km) {
1196  keymap = km;
1197  }
1198 
1199  auto get_priomap(void) const { return priomap; }
1200 
1204  template <typename Priomap>
1205  void set_priomap(Priomap &&pm) {
1206  priomap = std::forward<Priomap>(pm);
1207  }
1208 
1211  template<typename Constraint>
1212  void add_constraint(Constraint&& c) {
1213  /* currently a noop */
1214  }
1215 
1216  template<typename Constraint, typename Mapper>
1217  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
1218  /* currently a noop */
1219  }
1220 
1221  template<typename Constraint, typename Mapper>
1222  void add_constraint(Constraint c, Mapper&& map) {
1223  /* currently a noop */
1224  }
1225 
1227  void make_executable() override {
1228  TTBase::make_executable();
1229  this->process_pending();
1230  }
1231 
1233 
1238  void fence() override { ttg_fence(world); }
1239 
1241  template <std::size_t i>
1242  std::tuple_element_t<i, input_terminals_type> *in() {
1243  return &std::get<i>(input_terminals);
1244  }
1245 
1247  template <std::size_t i>
1248  std::tuple_element_t<i, output_terminalsT> *out() {
1249  return &std::get<i>(output_terminals);
1250  }
1251 
1253  template <typename Key = keyT>
1254  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1255  const Key &key, const input_values_tuple_type &args) {
1257  if constexpr(!std::is_same_v<Key, key_type>) {
1258  key_type k = key; /* cast that type into the key type we know */
1259  invoke(k, args);
1260  } else {
1261  /* trigger non-void inputs */
1262  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
1263  /* trigger void inputs */
1264  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1265  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1266  }
1267  }
1268 
1270  template <typename Key = keyT>
1271  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1272  const input_values_tuple_type &args) {
1274  /* trigger non-void inputs */
1275  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
1276  /* trigger void inputs */
1277  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1278  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1279  }
1280 
1282  template <typename Key = keyT>
1283  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1284  const Key &key) {
1286  if constexpr(!std::is_same_v<Key, key_type>) {
1287  key_type k = key; /* cast that type into the key type we know */
1288  invoke(k);
1289  } else {
1290  /* trigger void inputs */
1291  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1292  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1293  }
1294  }
1295 
1297  template <typename Key = keyT>
1298  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1300  /* trigger void inputs */
1301  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1302  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1303  }
1304 
1305  void invoke() override {
1306  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
1307  invoke<keyT>();
1308  else
1309  TTBase::invoke();
1310  }
1311 
1312  void set_defer_writer(bool _) {}
1313 
1314  bool get_defer_writer(bool _) { return false; }
1315 
1318  const decltype(keymap) &get_keymap() const { return keymap; }
1319 
1323  template <typename Key>
1324  std::enable_if_t<!ttg::meta::is_void_v<Key>, int> owner(const Key &key) const {
1325  return keymap(key);
1326  }
1327 
1330  template <typename Key>
1331  std::enable_if_t<ttg::meta::is_void_v<Key>, int> owner() const {
1332  return keymap();
1333  }
1334  };
1335 
1336 #include "ttg/make_tt.h"
1337 
1338 } // namespace ttg_madness
1339 
1340 #include "ttg/madness/watch.h"
1341 #include "ttg/madness/buffer.h"
1342 #include "ttg/madness/ttvalue.h"
1343 
1344 #endif // MADNESS_TTG_H_INCLUDED
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:276
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:30
std::string get_class_name() const
Gets the demangled class name (uses RTTI)
Definition: tt.h:220
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:217
TTBase(TTBase &&other)
Definition: tt.h:115
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
bool is_lazy_pull()
Definition: tt.h:199
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1109
void set_arg(const Key &key, Value &&value)
Definition: ttg.h:513
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:735
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1242
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:681
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:257
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:477
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:433
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1189
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1238
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:1217
void invoke() override
Definition: ttg.h:1305
static auto & get(InTuple &&intuple)
Definition: ttg.h:276
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:675
static constexpr bool derived_has_hip_op()
Definition: ttg.h:222
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:873
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:715
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1324
actual_input_tuple_type input_args_type
Definition: ttg.h:261
static __thread struct ttg_madness::TT::@0 threaddata
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1248
static constexpr bool derived_has_device_op()
Definition: ttg.h:232
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:227
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:217
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:687
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:264
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1254
virtual ~TT()
Definition: ttg.h:1147
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1140
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:695
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:706
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:1222
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:796
decltype(keymap) const & get_keymap() const
Definition: ttg.h:1318
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1331
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1283
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:726
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:817
static resultT get(InTuple &&intuple)
Definition: ttg.h:272
static constexpr int numinedges
Definition: ttg.h:239
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:922
void set_keymap(Keymap &&km)
Definition: ttg.h:1195
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:1176
const auto & get_output_terminals() const
Definition: ttg.h:285
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:259
void set_defer_writer(bool _)
Definition: ttg.h:1312
static constexpr int numins
Definition: ttg.h:240
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1271
bool get_defer_writer(bool _)
Definition: ttg.h:1314
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1075
uint64_t key_hash
Definition: ttg.h:245
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:495
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1298
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:251
::madness::WorldObject< ttT > worldobjT
Definition: ttg.h:237
void set_priomap(Priomap &&pm)
Definition: ttg.h:1205
void make_executable() override
implementation of TTBase::make_executable()
Definition: ttg.h:1227
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:263
auto get_priomap(void) const
Definition: ttg.h:1199
ttg::World get_world() const override final
Definition: ttg.h:214
output_terminalsT output_terminals_type
Definition: ttg.h:267
static constexpr int numouts
Definition: ttg.h:241
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1102
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:268
size_t call_depth
Definition: ttg.h:246
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:250
void add_constraint(Constraint &&c)
Definition: ttg.h:1212
keyT key_type
Definition: ttg.h:198
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
virtual void fence_impl(void) override
Definition: ttg.h:94
const ttg::Edge & ctl_edge() const
Definition: ttg.h:98
virtual ~WorldImpl() override
Definition: ttg.h:86
WorldImpl(WorldImpl &&other)=delete
WorldImpl(::madness::World &world)
Definition: ttg.h:75
ttg::Edge & ctl_edge()
Definition: ttg.h:96
::madness::World & impl()
Definition: ttg.h:113
virtual void destroy(void) override
Definition: ttg.h:100
WorldImpl & operator=(WorldImpl &&other)=delete
WorldImpl(const SafeMPI::Intracomm &comm)
Definition: ttg.h:77
const ::madness::World & impl() const
Definition: ttg.h:115
#define TTGUNUSED(x)
Definition: macro.h:6
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
this contains MADNESS-based TTG functionality
Definition: fwd.h:16
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:148
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:124
void ttg_execute(ttg::World world)
Definition: ttg.h:142
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:166
void ttg_sum(ttg::World world, T &value)
Definition: ttg.h:169
void ttg_fence(ttg::World world)
Definition: ttg.h:145
void ttg_finalize()
Definition: ttg.h:132
void make_executable_hook(ttg::World &)
Definition: ttg.h:122
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:162
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:175
void ttg_abort()
Definition: ttg.h:138
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:157
ttg::World ttg_default_execution_context()
Definition: ttg.h:137
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:543
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
TaskCoroutineID
Definition: coroutine.h:222
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181