ttg.h
Go to the documentation of this file.
1 #ifndef MADNESS_TTG_H_INCLUDED
2 #define MADNESS_TTG_H_INCLUDED
3 
4 /* set up env if this header was included directly */
5 #if !defined(TTG_IMPL_NAME)
6 #define TTG_USE_MADNESS 1
7 #endif // !defined(TTG_IMPL_NAME)
8 
9 #include "ttg/impl_selector.h"
10 
11 /* include ttg header to make symbols available in case this header is included directly */
12 #include "../../ttg.h"
13 #include "ttg/base/keymap.h"
14 #include "ttg/base/tt.h"
15 #include "ttg/func.h"
16 #include "ttg/madness/device.h"
17 #include "ttg/runtimes.h"
18 #include "ttg/tt.h"
19 #include "ttg/util/bug.h"
20 #include "ttg/util/env.h"
21 #include "ttg/util/hash.h"
22 #include "ttg/util/macro.h"
23 #include "ttg/util/meta.h"
24 #include "ttg/util/meta/callable.h"
25 #include "ttg/util/void.h"
26 #include "ttg/world.h"
27 #include "ttg/coroutine.h"
28 
29 #include <array>
30 #include <cassert>
31 #include <functional>
32 #include <future>
33 #include <iostream>
34 #include <map>
35 #include <memory>
36 #include <string>
37 #include <tuple>
38 #include <vector>
39 
40 #include <madness/world/MADworld.h>
41 #include <madness/world/world_object.h>
42 #include <madness/world/worldhashmap.h>
43 #include <madness/world/worldtypes.h>
44 
45 #include <madness/world/world_task_queue.h>
46 
47 namespace ttg_madness {
48 
49 #if 0
50  class Control;
51  class Graph;
53  class Graph {
54  public:
55  Graph() {
56  world_ = default_execution_context();
57  }
58  Graph(World& w) : world_(w) {}
59 
60 
61  private:
62  World& world_;
63  };
64 #endif
65 
66  class WorldImpl final : public ttg::base::WorldImplBase {
67  private:
68  ::madness::World &m_impl;
69  bool m_allocated = false;
70 
71  ttg::Edge<> m_ctl_edge;
72 
73  public:
74  WorldImpl(::madness::World &world) : WorldImplBase(world.size(), world.rank()), m_impl(world) {}
75 
76  WorldImpl(const SafeMPI::Intracomm &comm)
77  : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
78 
79  /* Deleted copy ctor */
80  WorldImpl(const WorldImpl &other) = delete;
81 
82  /* Deleted move ctor */
83  WorldImpl(WorldImpl &&other) = delete;
84 
85  virtual ~WorldImpl() override { destroy(); }
86 
87  /* Deleted copy assignment */
88  WorldImpl &operator=(const WorldImpl &other) = delete;
89 
90  /* Deleted move assignment */
91  WorldImpl &operator=(WorldImpl &&other) = delete;
92 
93  virtual void fence_impl(void) override { m_impl.gop.fence(); }
94 
95  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
96 
97  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
98 
99  virtual void destroy(void) override {
100  if (is_valid()) {
101  release_ops();
103  if (m_allocated) {
104  delete &m_impl;
105  m_allocated = false;
106  }
107  mark_invalid();
108  }
109  }
110 
111  /* Return an unmanaged reference to the implementation object */
112  ::madness::World &impl() { return m_impl; }
113 
114  const ::madness::World &impl() const { return m_impl; }
115 
116 #ifdef ENABLE_PARSEC
117  parsec_context_t *context() { return ::madness::ThreadPool::instance()->parsec; }
118 #endif
119  };
120 
121  inline void make_executable_hook(ttg::World& world) { }
122 
123  inline void ttg_initialize(int argc, char **argv, int num_threads) {
125  ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
126  auto *world_ptr = new ttg_madness::WorldImpl{madworld};
127  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
128  ttg::World world{std::move(world_sptr)};
129  ttg::detail::set_default_world(std::move(world));
130  }
131  inline void ttg_finalize() {
132  ttg::detail::set_default_world(ttg::World{}); // reset the default world
133  ttg::detail::destroy_worlds<ttg_madness::WorldImpl>();
135  }
137  inline void ttg_abort() {
138  MPI_Abort(ttg_default_execution_context().impl().impl().mpi.Get_mpi_comm(), 1);
139  assert(0); // make sure we abort
140  }
141  inline void ttg_execute(ttg::World world) {
142  // World executes tasks eagerly
143  }
144  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
145 
146  template <typename T>
147  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
148  world.impl().register_ptr(ptr);
149  }
150 
151  template <typename T>
152  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
153  world.impl().register_ptr(std::move(ptr));
154  }
155 
156  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
157  world.impl().register_status(status_ptr);
158  }
159 
160  template <typename Callback>
161  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
162  world.impl().register_callback(std::forward<Callback>(callback));
163  }
164 
165  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
166 
167  template <typename T>
168  inline void ttg_sum(ttg::World world, T &value) {
169  world.impl().impl().gop.sum(value);
170  }
173  template <typename T>
174  inline void ttg_broadcast(ttg::World world, T &data, int source_rank) {
175  world.impl().impl().gop.broadcast_serializable(data, source_rank);
176  }
177 
186  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs>
187  class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs>> {
188  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
189  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
190  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
191  // create a virtual control input if the input list is empty, to be used in invoke()
192  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
194 
195  public:
196  using ttT = TT;
197  using key_type = keyT;
199  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
200 
201  private:
202  ttg::World world;
203  ttg::meta::detail::keymap_t<keyT> keymap;
204  ttg::meta::detail::keymap_t<keyT> priomap;
205  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
206  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
207  input_reducers;
208  int num_pullins = 0;
209 
210  std::array<std::size_t, std::tuple_size_v<actual_input_tuple_type>> static_streamsize;
211 
212  public:
213  ttg::World get_world() const override final { return world; }
214 
216  static constexpr bool derived_has_cuda_op() {
217  return false;
218  }
219 
221  static constexpr bool derived_has_hip_op() {
222  return false;
223  }
224 
226  static constexpr bool derived_has_level_zero_op() {
227  return false;
228  }
229 
231  static constexpr bool derived_has_device_op() {
232  return false;
233  }
234 
235  protected:
236  using worldobjT = ::madness::WorldObject<ttT>;
237 
238  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
239  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
240  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
241 
242  // This to support tt fusion
243  inline static __thread struct {
244  uint64_t key_hash = 0; // hash of current key
245  size_t call_depth = 0; // how deep calls are nested
247 
248  public:
251  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
252  static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
253  "at most one void input can be handled, and it must come last");
254  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
256  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
258  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
259 
260  using input_args_type = actual_input_tuple_type;
261 
262  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
263  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
264  static_assert(!ttg::meta::is_any_void_v<input_values_tuple_type>);
265 
266  using output_terminals_type = output_terminalsT;
268 
269  protected:
270  template <std::size_t i, typename resultT, typename InTuple>
271  static resultT get(InTuple &&intuple) {
272  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
273  };
274  template <std::size_t i, typename InTuple>
275  static auto &get(InTuple &&intuple) {
276  return std::get<i>(std::forward<InTuple>(intuple));
277  };
278 
279  private:
280  input_terminals_type input_terminals;
281  output_terminalsT output_terminals;
282 
283  protected:
284  const auto &get_output_terminals() const { return output_terminals; }
285 
286  private:
287  struct TTArgs : ::madness::TaskInterface {
288  private:
289  using TaskInterface = ::madness::TaskInterface;
290 
291  public:
292  int counter; // Tracks the number of arguments finalized
293  std::array<std::int64_t, numins>
294  nargs; // Tracks the number of expected values minus the number of received values
295  // 0 = finalized
296  // for a streaming input initialized to std::numeric_limits<std::int64_t>::max()
297  // which indicates that the value needs to be initialized
298  std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
299  // inputs (0 = unbounded stream, >0 = bounded stream)
300  input_values_tuple_type input_values; // The input values (does not include control)
301  derivedT *derived; // Pointer to derived class instance
302  bool pull_terminals_invoked = false;
303  std::conditional_t<ttg::meta::is_void_v<keyT>, ttg::Void, keyT> key; // Task key
304 #ifdef TTG_HAVE_COROUTINE
305  void *suspended_task_address = nullptr; // if not null the function is suspended
307 #endif // TTG_HAVE_COROUTINE
308 
310  template <typename Tuple, std::size_t... Is>
311  static input_refs_tuple_type make_input_refs_impl(Tuple &&inputs, std::index_sequence<Is...>) {
312  return input_refs_tuple_type{
313  get<Is, std::tuple_element_t<Is, input_refs_tuple_type>>(std::forward<Tuple>(inputs))...};
314  }
315 
317  input_refs_tuple_type make_input_refs() {
318  return make_input_refs_impl(this->input_values,
319  std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{});
320  }
321 
322  TTArgs(int prio = 0)
323  : TaskInterface(TaskAttributes(prio ? TaskAttributes::HIGHPRIORITY : 0))
324  , counter(numins)
325  , nargs()
326  , stream_size()
327  , input_values() {
328  std::fill(nargs.begin(), nargs.end(), std::numeric_limits<std::int64_t>::max());
329  }
330 
331  virtual void run(::madness::World &world) override {
332  using ttg::hash;
333  ttT::threaddata.key_hash = hash<decltype(key)>{}(key);
334  ttT::threaddata.call_depth++;
335 
336  void *suspended_task_address =
337 #ifdef TTG_HAVE_COROUTINE
338  this->suspended_task_address; // non-null = need to resume the task
339 #else // TTG_HAVE_COROUTINE
340  nullptr;
341 #endif // TTG_HAVE_COROUTINE
342  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
343  // ttg::print("starting task");
344  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
346  suspended_task_address,
347  coroutine_id,
348  derived->op(key, this->make_input_refs(),
349  derived->output_terminals)); // !!! NOTE converting input values to refs
350  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
351  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
352  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
354  suspended_task_address,
355  coroutine_id,
356  derived->op(this->make_input_refs(),
357  derived->output_terminals)); // !!! NOTE converting input values to refs
358  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
359  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
360  } else // unreachable
361  ttg::abort();
362  } else { // resume suspended coroutine
363 #ifdef TTG_HAVE_COROUTINE
364  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
365  assert(ret.ready());
366  ret.resume();
367  if (ret.completed()) {
368  ret.destroy();
369  suspended_task_address = nullptr;
370  } else { // not yet completed
371  // leave suspended_task_address as is
372  }
373  this->suspended_task_address = suspended_task_address;
374 #else // TTG_HAVE_COROUTINE
375  ttg::abort(); // should not happen
376 #endif // TTG_HAVE_COROUTINE
377  }
378 
379  ttT::threaddata.call_depth--;
380 
381  // if (suspended_task_address == nullptr) {
382  // ttg::print("finishing task",ttT::threaddata.call_depth);
383  // }
384 
385 #ifdef TTG_HAVE_COROUTINE
386  if (suspended_task_address) {
387  // TODO implement handling of suspended coroutines properly
388 
389  // only resumable_task is recognized at the moment
390  assert(coroutine_id == ttg::TaskCoroutineID::ResumableTask);
391 
392  // right now can events are not properly implemented, we are only testing the workflow with dummy events
393  // so mark the events finished manually
394  // proper thing to do is to process event queue and resubmit this task again
395  auto events =
396  static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
397  for (auto &event_ptr : events) {
398  event_ptr->finish();
399  }
400  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
401 
402  // resume the coroutine
403  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
404  assert(ret.ready());
405  ret.resume();
406  if (ret.completed()) {
407  ret.destroy();
408  suspended_task_address = nullptr;
409  } else { // not yet completed
410  ttg::abort();
411  }
412  }
413 #endif // TTG_HAVE_COROUTINE
414  }
415 
416  virtual ~TTArgs() {} // Will be deleted via TaskInterface*
417 
418  private:
419  ::madness::Spinlock lock_; // synchronizes access to data
420  public:
421  void lock() { lock_.lock(); }
422  void unlock() { lock_.unlock(); }
423  };
424 
425  using hashable_keyT = std::conditional_t<ttg::meta::is_void_v<keyT>, int, keyT>;
426  using cacheT = ::madness::ConcurrentHashMap<hashable_keyT, TTArgs *, ttg::hash<hashable_keyT>>;
427  using accessorT = typename cacheT::accessor;
428  cacheT cache;
429 
430  protected:
431  template <typename terminalT, std::size_t i, typename Key>
432  void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args) {
433  if (in.is_pull_terminal) {
434  int owner;
435  if constexpr (!ttg::meta::is_void_v<Key>) {
436  owner = in.container.owner(key);
437  } else {
438  owner = in.container.owner();
439  }
440 
441  if (owner != world.rank()) {
442  get_terminal_data<i, Key>(owner, key);
443  } else {
444  if constexpr (!ttg::meta::is_void_v<Key>) {
445  auto value = (in.container).get(key);
446  if (args->nargs[i] == 0) {
447  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
448  ": error argument is already finalized : ", i);
449  throw std::runtime_error("Op::set_arg called for a finalized stream");
450  }
451 
452  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
453  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
454  args->nargs[i] = 0;
455  args->counter--;
456  }
457  } else {
458  auto value = (in.container).get();
459  if (args->nargs[i] == 0) {
460  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
461  ": error argument is already finalized : ", i);
462  throw std::runtime_error("Op::set_arg called for a finalized stream");
463  }
464 
465  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
466  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
467  args->nargs[i] = 0;
468  args->counter--;
469  }
470  }
471  }
472  }
473  }
474 
475  template <std::size_t i, typename Key>
476  void get_terminal_data(const int owner, const Key &key) {
477  if (owner != world.rank()) {
478  worldobjT::send(owner, &ttT::template get_terminal_data<i, Key>, owner, key);
479  } else {
480  auto &in = std::get<i>(input_terminals);
481  if constexpr (!ttg::meta::is_void_v<Key>) {
482  auto value = (in.container).get(key);
483  worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
484  key, value);
485  } else {
486  auto value = (in.container).get();
487  worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
488  value);
489  }
490  }
491  }
492 
493  template <std::size_t... IS, typename Key = keyT>
494  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, TTArgs *args) {
495  int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
496  std::get<IS>(input_terminals), key, args),
497  0)...};
498  junk[0]++;
499  }
500 
501  // there are 6 types of set_arg:
502  // - case 1: nonvoid Key, complete Value type
503  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
504  // - case 3: nonvoid Key, void Value, no inputs
505  // - case 4: void Key, complete Value type
506  // - case 5: void Key, void Value, mixed (data+control) inputs
507  // - case 6: void Key, void Value, no inputs
508  // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
509 
510  // case 1:
511  template <std::size_t i, typename Key, typename Value>
512  void set_arg(const Key &key, Value &&value) {
513  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>; // Should be T or const T
514  static_assert(std::is_same_v<std::decay_t<Value>, std::decay_t<valueT>>,
515  "TT::set_arg(key,value) given value of type incompatible with TT");
516 
517  int owner;
518  if constexpr (!ttg::meta::is_void_v<Key>) {
519  owner = keymap(key);
520  } else {
521  owner = keymap();
522  }
523 
524  if (owner != world.rank()) {
525  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding setting argument : ", i);
526  // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
527  // BUT compiler vomits when const std::remove_reference_t<Value>& -> std::decay_t<Value>
528  // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
529  // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
530  // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
531  // move arguments) and locally
532  // here we know that this will be a remove execution, so we prepare to take rvalues;
533  // send_am will need to separate local and remote paths to deal with this
534  if constexpr (!ttg::meta::is_void_v<Key>) {
535  if constexpr (!ttg::meta::is_void_v<Value>) {
536  worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
537  } else {
538  worldobjT::send(owner, &ttT::template set_arg<i, Key, void>, key);
539  }
540  } else {
541  if constexpr (!ttg::meta::is_void_v<Value>) {
542  worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
543  } else {
544  worldobjT::send(owner, &ttT::template set_arg<i, void, void>);
545  }
546  }
547  } else {
548  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
549 
550  bool pullT_invoked = false;
551  accessorT acc;
552 
553  int prio;
554  if constexpr (!ttg::meta::is_void_v<Key>) {
555  prio = this->priomap(key);
556  if (cache.insert(acc, key)) {
557  acc->second = new TTArgs(prio); // It will be deleted by the task q
558  if (!is_lazy_pull()) {
559  // Invoke pull terminals for only the terminals with non-void values.
560  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
561  acc->second);
562  pullT_invoked = true;
563  }
564  }
565  } else {
566  prio = this->priomap();
567  if (cache.insert(acc, 0)) acc->second = new TTArgs(prio); // It will be deleted by the task q
568  }
569 
570  TTArgs *args = acc->second;
571  if (!is_lazy_pull() && pullT_invoked) args->pull_terminals_invoked = true;
572 
573  if (args->nargs[i] == 0) {
574  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
575  throw std::runtime_error("TT::set_arg called for a finalized stream");
576  }
577 
578  const auto &reducer = std::get<i>(input_reducers);
579  if (reducer) { // is this a streaming input? reduce the received value
580  // N.B. Right now reductions are done eagerly, without spawning tasks
581  // this means we must lock
582  args->lock();
583 
584  bool initialize_not_reduce = false;
585  if (args->nargs[i] == std::numeric_limits<std::int64_t>::max()) {
586  // upon first datum initialize, if needed
587  if constexpr (!ttg::meta::is_void_v<valueT>) {
588  initialize_not_reduce = true;
589  }
590 
591  // initialize nargs
592  // if we have a stream size for the op, use it first
593  if (args->stream_size[i] != 0) {
594  assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
595  args->nargs[i] = args->stream_size[i];
596  } else if (static_streamsize[i] != 0) {
597  assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
598  args->stream_size[i] = static_streamsize[i];
599  args->nargs[i] = static_streamsize[i];
600  } else {
601  args->nargs[i] = 0;
602  }
603  }
604 
605  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
606  if (initialize_not_reduce)
607  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
608  else
609  reducer(this->get<i, std::decay_t<valueT> &>(args->input_values), value);
610  } else {
611  reducer(); // even if this was a control input, must execute the reducer for possible side effects
612  }
613 
614  // update the counter
615  args->nargs[i]--;
616 
617  // is this the last message?
618  if (args->nargs[i] == 0) args->counter--;
619 
620  args->unlock();
621  } else { // this is a nonstreaming input => set the value
622  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
623  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
624  }
625  args->nargs[i] = 0;
626  args->counter--;
627  }
628 
629  // If lazy pulling in enabled, check it here.
630  if (numins - args->counter == num_pullins) {
631  if (is_lazy_pull() && !args->pull_terminals_invoked) {
632  // Invoke pull terminals for only the terminals with non-void values.
633  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
634  }
635  }
636 
637  // ready to run the task?
638  if (args->counter == 0) {
639  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
640  args->derived = static_cast<derivedT *>(this);
641  args->key = key;
642 
643  using ttg::hash;
644  auto curhash = hash<keyT>{}(key);
645 
646  if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
647 
648  // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
649  ttT::threaddata.call_depth++;
650  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
651  static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
652  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
653  static_cast<derivedT *>(this)->op(key, output_terminals); // Runs immediately
654  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
655  static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
656  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
657  static_cast<derivedT *>(this)->op(output_terminals); // Runs immediately
658  } else
659  ttg::abort();
660  ttT::threaddata.call_depth--;
661 
662  } else {
663  // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
664  world.impl().impl().taskq.add(args);
665  }
666 
667  cache.erase(acc);
668  }
669  }
670  }
671 
672  // case 2 and 3
673  template <std::size_t i, typename Key, typename Value>
674  std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
675  set_arg<i>(key, ttg::Void{});
676  }
677 
678  // case 4
679  template <std::size_t i, typename Key = keyT, typename Value>
680  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
681  return set_arg<i>(ttg::Void{}, std::forward<Value>(value));
682  }
683 
684  // case 5 and 6
685  template <std::size_t i, typename Key = keyT, typename Value>
686  std::enable_if_t<ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg() {
687  set_arg<i, ttg::Void, ttg::Void>(ttg::Void{}, ttg::Void{});
688  }
689 
690  // Used by invoke to set all arguments associated with a task
691  // Is: index sequence of elements in args
692  // Js: index sequence of input terminals to set
693  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
694  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
695  const Key &key, const std::tuple<Ts...> &args) {
696  static_assert(sizeof...(Js) == sizeof...(Is));
697  constexpr std::size_t js[] = {Js...};
698  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
699  junk[0]++;
700  }
701 
702  // Used by invoke to set all arguments associated with a task
703  // Is: index sequence of input terminals to set
704  template <typename Key, typename... Ts, size_t... Is>
705  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
706  const std::tuple<Ts...> &args) {
707  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
708  }
709 
710  // Used by invoke to set all arguments associated with a task
711  // Is: index sequence of elements in args
712  // Js: index sequence of input terminals to set
713  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
714  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
715  const std::tuple<Ts...> &args) {
716  static_assert(sizeof...(Js) == sizeof...(Is));
717  constexpr std::size_t js[] = {Js...};
718  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
719  junk[0]++;
720  }
721 
722  // Used by invoke to set all arguments associated with a task
723  // Is: index sequence of input terminals to set
724  template <typename Key = keyT, typename... Ts, size_t... Is>
725  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
726  const std::tuple<Ts...> &args) {
727  set_args(std::index_sequence_for<Ts...>{}, is, args);
728  }
729 
730  public:
733  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
734  std::enable_if_t<key_is_void, void> set_argstream_size(std::size_t size) {
735  // preconditions
736  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
737  assert(size > 0 && "TT::set_argstream_size(size) called with size=0");
738 
739  // body
740  const auto owner = keymap();
741  if (owner != world.rank()) {
742  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
743  worldobjT::send(owner, &ttT::template set_argstream_size<i, true>, size);
744  } else {
745  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
746 
747  accessorT acc;
748  if (cache.insert(acc, 0)) acc->second = new TTArgs(); // It will be deleted by the task q
749  TTArgs *args = acc->second;
750 
751  args->lock();
752 
753  // check if stream is already bounded
754  if (args->stream_size[i] > 0) {
755  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
756  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
757  }
758 
759  // check if stream is already finalized
760  if (args->nargs[i] == 0) {
761  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
762  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
763  }
764 
765  // commit changes
766  args->stream_size[i] = size;
767  // if messages already received for this key update the expected-received counter
768  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
769  if (messages_received_already) {
770  // cannot have received more messages than expected
771  if (-(args->nargs[i]) > size) {
772  ttg::print_error(world.rank(), ":", get_name(),
773  " : error stream received more messages than specified via set_argstream_size : ", i);
774  throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
775  }
776  args->nargs[i] += size;
777  }
778  // if done, update the counter
779  if (args->nargs[i] == 0) args->counter--;
780  args->unlock();
781 
782  // ready to run the task?
783  if (args->counter == 0) {
784  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
785  args->derived = static_cast<derivedT *>(this);
786 
787  world.impl().impl().taskq.add(args);
788 
789  cache.erase(acc);
790  }
791  }
792  }
793 
794  template <std::size_t i>
795  void set_static_argstream_size(std::size_t size) {
796  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
797  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
798 
799  ttg::trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
800 
801  // Check if stream is already bounded
802  if (static_streamsize[i] > 0) {
803  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
804  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
805  }
806 
807  // commit changes
808  static_streamsize[i] = size;
809  }
810 
815  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
816  std::enable_if_t<!key_is_void, void> set_argstream_size(const Key &key, std::size_t size) {
817  // preconditions
818  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
819  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
820 
821  // body
822  const auto owner = keymap(key);
823  if (owner != world.rank()) {
824  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
825  worldobjT::send(owner, &ttT::template set_argstream_size<i>, key, size);
826  } else {
827  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
828 
829  accessorT acc;
830  if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
831  TTArgs *args = acc->second;
832 
833  args->lock();
834 
835  // check if stream is already bounded
836  if (args->stream_size[i] > 0) {
837  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
838  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
839  }
840 
841  // check if stream is already finalized
842  if (args->nargs[i] == 0) {
843  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
844  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
845  }
846 
847  // commit changes
848  args->stream_size[i] = size;
849  // if messages already received for this key update the expected-received counter
850  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
851  if (messages_received_already) args->nargs[i] += size;
852  // if done, update the counter
853  if (args->nargs[i] == 0) args->counter--;
854 
855  args->unlock();
856 
857  // ready to run the task?
858  if (args->counter == 0) {
859  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
860  args->derived = static_cast<derivedT *>(this);
861  args->key = key;
862 
863  world.impl().impl().taskq.add(args);
864 
865  cache.erase(acc);
866  }
867  }
868  }
869 
871  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
872  std::enable_if_t<!key_is_void, void> finalize_argstream(const Key &key) {
873  // preconditions
874  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
875 
876  // body
877  const auto owner = keymap(key);
878  if (owner != world.rank()) {
879  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
880  worldobjT::send(owner, &ttT::template finalize_argstream<i>, key);
881  } else {
882  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
883 
884  accessorT acc;
885  const auto found = cache.find(acc, key);
886  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
887  TTGUNUSED(found);
888  TTArgs *args = acc->second;
889 
890  // check if stream is already bounded
891  if (args->stream_size[i] > 0) {
892  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
893  throw std::runtime_error("TT::finalize called for a bounded stream");
894  }
895 
896  // check if stream is already finalized
897  if (args->nargs[i] == 0) {
898  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
899  throw std::runtime_error("TT::finalize called for a finalized stream");
900  }
901 
902  // commit changes
903  args->nargs[i] = 0;
904  args->counter--;
905  // ready to run the task?
906  if (args->counter == 0) {
907  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
908  args->derived = static_cast<derivedT *>(this);
909  args->key = key;
910 
911  world.impl().impl().taskq.add(args);
912  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
913 
914  cache.erase(acc);
915  }
916  }
917  }
918 
920  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
921  std::enable_if_t<key_is_void, void> finalize_argstream() {
922  // preconditions
923  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
924 
925  // body
926  const int owner = keymap();
927  if (owner != world.rank()) {
928  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream finalize for terminal ", i);
929  worldobjT::send(owner, &ttT::template finalize_argstream<i, true>);
930  } else {
931  ttg::trace(world.rank(), ":", get_name(), " : finalizing stream for terminal ", i);
932 
933  accessorT acc;
934  const auto found = cache.find(acc, 0);
935  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
936  TTGUNUSED(found);
937  TTArgs *args = acc->second;
938 
939  // check if stream is already bounded
940  if (args->stream_size[i] > 0) {
941  ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
942  throw std::runtime_error("TT::finalize called for a bounded stream");
943  }
944 
945  // check if stream is already finalized
946  if (args->nargs[i] == 0) {
947  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
948  throw std::runtime_error("TT::finalize called for a finalized stream");
949  }
950 
951  // commit changes
952  args->nargs[i] = 0;
953  args->counter--;
954  // ready to run the task?
955  if (args->counter == 0) {
956  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
957  args->derived = static_cast<derivedT *>(this);
958 
959  world.impl().impl().taskq.add(args);
960  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
961 
962  cache.erase(acc);
963  }
964  }
965  }
966 
967  private:
968  // Copy/assign/move forbidden ... we could make it work using
969  // PIMPL for this base class. However, this instance of the base
970  // class is tied to a specific instance of a derived class a
971  // pointer to which is captured for invoking derived class
972  // functions. Thus, not only does the derived class have to be
973  // involved but we would have to do it in a thread safe way
974  // including for possibly already running tasks and remote
975  // references. This is not worth the effort ... wherever you are
976  // wanting to move/assign an TT you should be using a pointer.
977  TT(const TT &other) = delete;
978  TT &operator=(const TT &other) = delete;
979  TT(TT &&other) = delete;
980  TT &operator=(TT &&other) = delete;
981 
982  // Registers the callback for the i'th input terminal
983  template <typename terminalT, std::size_t i>
984  void register_input_callback(terminalT &input) {
985  static_assert(std::is_same_v<keyT, typename terminalT::key_type>,
986  "TT::register_input_callback(terminalT) -- incompatible terminalT");
987  using valueT = std::decay_t<typename terminalT::value_type>;
988 
989  if (input.is_pull_terminal) {
990  num_pullins++;
991  }
992 
994  // case 1: nonvoid key, nonvoid value
996  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
997  !std::is_void_v<valueT>) {
998  auto move_callback = [this](const keyT &key, valueT &&value) {
999  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
1000  };
1001  auto send_callback = [this](const keyT &key, const valueT &value) {
1002  set_arg<i, keyT, const valueT &>(key, value);
1003  };
1004  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1005  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1006  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1007  }
1009  // case 4: void key, nonvoid value
1011  else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1012  !std::is_void_v<valueT>) {
1013  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1014  auto send_callback = [this](const valueT &value) { set_arg<i, keyT, const valueT &>(value); };
1015  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1016  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1017  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1018  }
1020  // case 2: nonvoid key, void value, mixed inputs
1021  // case 3: nonvoid key, void value, no inputs
1023  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
1024  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, void>(key); };
1025  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1026  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1027  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1028  }
1030  // case 5: void key, void value, mixed inputs
1031  // case 6: void key, void value, no inputs
1033  else if constexpr (ttg::meta::is_all_void_v<keyT, valueT> && std::is_void_v<valueT>) {
1034  auto send_callback = [this]() { set_arg<i, keyT, void>(); };
1035  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1036  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1037  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1038  } else
1039  ttg::abort();
1040  }
1041 
1042  template <std::size_t... IS>
1043  void register_input_callbacks(std::index_sequence<IS...>) {
1044  int junk[] = {
1045  0,
1046  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1047  0)...};
1048  junk[0]++;
1049  }
1050 
1051  template <std::size_t... IS, typename inedgesT>
1052  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1053  static_assert(sizeof...(IS) == std::tuple_size_v<input_terminals_type>);
1054  static_assert(std::tuple_size_v<inedgesT> == std::tuple_size_v<input_terminals_type>);
1055  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
1056  junk[0]++;
1057  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1058  " Edges");
1059  }
1060 
1061  template <std::size_t... IS, typename outedgesT>
1062  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1063  static_assert(sizeof...(IS) == numouts);
1064  static_assert(std::tuple_size_v<outedgesT> == numouts);
1065  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
1066  junk[0]++;
1067  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1068  " Edges");
1069  }
1070 
1071  public:
1072  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1073  typename priomapT = ttg::detail::default_priomap<keyT>>
1074  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1075  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1076  : ttg::TTBase(name, numinedges, numouts)
1077  , static_streamsize()
1078  , worldobjT(world.impl().impl())
1079  , world(world)
1080  // if using default keymap, rebind to the given world
1081  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1082  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1083  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1084  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1085  // Cannot call these in base constructor since terminals not yet constructed
1086  if (innames.size() != numinedges) {
1087  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1088  numinedges);
1089  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1090  }
1091  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1092 
1093  register_input_terminals(input_terminals, innames);
1094  register_output_terminals(output_terminals, outnames);
1095 
1096  register_input_callbacks(std::make_index_sequence<numinedges>{});
1097  }
1098 
1099  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1100  typename priomapT = ttg::detail::default_priomap<keyT>>
1101  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1102  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1103  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
1104  std::forward<priomapT>(priomap)) {}
1105 
1106  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1107  typename priomapT = ttg::detail::default_priomap<keyT>>
1108  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1109  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1110  keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1111  : ttg::TTBase(name, numinedges, numouts)
1112  , static_streamsize()
1113  , worldobjT(ttg::default_execution_context().impl().impl())
1114  , world(ttg::default_execution_context())
1115  // if using default keymap, rebind to the given world
1116  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1117  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1118  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1119  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1120  // Cannot call in base constructor since terminals not yet constructed
1121  if (innames.size() != numinedges) {
1122  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1123  numinedges);
1124  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1125  }
1126  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1127 
1128  register_input_terminals(input_terminals, innames);
1129  register_output_terminals(output_terminals, outnames);
1130 
1131  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
1132  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
1133  // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1134  register_input_callbacks(std::make_index_sequence<numinedges>{});
1135  }
1136 
1137  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1138  typename priomapT = ttg::detail::default_priomap<keyT>>
1139  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1140  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1141  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1142  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
1143  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
1144 
1145  // Destructor checks for unexecuted tasks
1146  virtual ~TT() {
1147  if (cache.size() != 0) {
1148  std::cerr << world.rank() << ":"
1149  << "warning: unprocessed tasks in destructor of operation '" << get_name()
1150  << "' (class name = " << get_class_name() << ")" << std::endl;
1151  std::cerr << world.rank() << ":"
1152  << " T => argument assigned F => argument unassigned" << std::endl;
1153  int nprint = 0;
1154  for (auto item : cache) {
1155  if (nprint++ > 10) {
1156  std::cerr << " etc." << std::endl;
1157  break;
1158  }
1159  using ::madness::operators::operator<<;
1160  std::cerr << world.rank() << ":"
1161  << " unused: " << item.first << " : ( ";
1162  for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1163  std::cerr << ")" << std::endl;
1164  }
1165  ttg::abort();
1166  }
1167  }
1168 
1174  template <std::size_t i, typename Reducer>
1175  void set_input_reducer(Reducer &&reducer) {
1176  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
1177  std::get<i>(input_reducers) = reducer;
1178  }
1179 
1187  template <std::size_t i, typename Reducer>
1188  void set_input_reducer(Reducer &&reducer, std::size_t size) {
1189  set_input_reducer<i>(std::forward<Reducer>(reducer));
1190  set_static_argstream_size<i>(size);
1191  }
1192 
1193  template <typename Keymap>
1194  void set_keymap(Keymap &&km) {
1195  keymap = km;
1196  }
1197 
1198  auto get_priomap(void) const { return priomap; }
1199 
1203  template <typename Priomap>
1204  void set_priomap(Priomap &&pm) {
1205  priomap = std::forward<Priomap>(pm);
1206  }
1207 
1209  void make_executable() override {
1210  TTBase::make_executable();
1211  this->process_pending();
1212  }
1213 
1215 
1220  void fence() override { ttg_fence(world); }
1221 
1223  template <std::size_t i>
1224  std::tuple_element_t<i, input_terminals_type> *in() {
1225  return &std::get<i>(input_terminals);
1226  }
1227 
1229  template <std::size_t i>
1230  std::tuple_element_t<i, output_terminalsT> *out() {
1231  return &std::get<i>(output_terminals);
1232  }
1233 
1235  template <typename Key = keyT>
1236  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1237  const Key &key, const input_values_tuple_type &args) {
1239  if constexpr(!std::is_same_v<Key, key_type>) {
1240  key_type k = key; /* cast that type into the key type we know */
1241  invoke(k, args);
1242  } else {
1243  /* trigger non-void inputs */
1244  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
1245  /* trigger void inputs */
1246  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1247  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1248  }
1249  }
1250 
1252  template <typename Key = keyT>
1253  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1254  const input_values_tuple_type &args) {
1256  /* trigger non-void inputs */
1257  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
1258  /* trigger void inputs */
1259  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1260  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1261  }
1262 
1264  template <typename Key = keyT>
1265  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1266  const Key &key) {
1268  if constexpr(!std::is_same_v<Key, key_type>) {
1269  key_type k = key; /* cast that type into the key type we know */
1270  invoke(k);
1271  } else {
1272  /* trigger void inputs */
1273  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1274  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1275  }
1276  }
1277 
1279  template <typename Key = keyT>
1280  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1282  /* trigger void inputs */
1283  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1284  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1285  }
1286 
1287  void invoke() override {
1288  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
1289  invoke<keyT>();
1290  else
1291  TTBase::invoke();
1292  }
1293 
1294  void set_defer_writer(bool _) {}
1295 
1296  bool get_defer_writer(bool _) { return false; }
1297 
1300  const decltype(keymap) &get_keymap() const { return keymap; }
1301 
1305  template <typename Key>
1306  std::enable_if_t<!ttg::meta::is_void_v<Key>, int> owner(const Key &key) const {
1307  return keymap(key);
1308  }
1309 
1312  template <typename Key>
1313  std::enable_if_t<ttg::meta::is_void_v<Key>, int> owner() const {
1314  return keymap();
1315  }
1316  };
1317 
1318 #include "ttg/make_tt.h"
1319 
1320 } // namespace ttg_madness
1321 
1322 #include "ttg/madness/watch.h"
1323 #include "ttg/madness/buffer.h"
1324 #include "ttg/madness/ttvalue.h"
1325 
1326 #endif // MADNESS_TTG_H_INCLUDED
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:276
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:30
std::string get_class_name() const
Gets the demangled class name (uses RTTI)
Definition: tt.h:220
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:217
TTBase(TTBase &&other)
Definition: tt.h:115
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
bool is_lazy_pull()
Definition: tt.h:199
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1108
void set_arg(const Key &key, Value &&value)
Definition: ttg.h:512
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:734
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1224
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:680
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:256
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:476
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:432
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1188
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1220
void invoke() override
Definition: ttg.h:1287
static auto & get(InTuple &&intuple)
Definition: ttg.h:275
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:674
static constexpr bool derived_has_hip_op()
Definition: ttg.h:221
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:872
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:714
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1306
actual_input_tuple_type input_args_type
Definition: ttg.h:260
static __thread struct ttg_madness::TT::@0 threaddata
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1230
static constexpr bool derived_has_device_op()
Definition: ttg.h:231
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:226
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:216
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:686
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:263
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1236
virtual ~TT()
Definition: ttg.h:1146
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1139
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:694
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:705
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:795
decltype(keymap) const & get_keymap() const
Definition: ttg.h:1300
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1313
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1265
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:725
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:816
static resultT get(InTuple &&intuple)
Definition: ttg.h:271
static constexpr int numinedges
Definition: ttg.h:238
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:921
void set_keymap(Keymap &&km)
Definition: ttg.h:1194
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:1175
const auto & get_output_terminals() const
Definition: ttg.h:284
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:258
void set_defer_writer(bool _)
Definition: ttg.h:1294
static constexpr int numins
Definition: ttg.h:239
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1253
bool get_defer_writer(bool _)
Definition: ttg.h:1296
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1074
uint64_t key_hash
Definition: ttg.h:244
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:494
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1280
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:250
::madness::WorldObject< ttT > worldobjT
Definition: ttg.h:236
void set_priomap(Priomap &&pm)
Definition: ttg.h:1204
void make_executable() override
implementation of TTBase::make_executable()
Definition: ttg.h:1209
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:262
auto get_priomap(void) const
Definition: ttg.h:1198
ttg::World get_world() const override final
Definition: ttg.h:213
output_terminalsT output_terminals_type
Definition: ttg.h:266
static constexpr int numouts
Definition: ttg.h:240
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1101
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:267
size_t call_depth
Definition: ttg.h:245
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:249
keyT key_type
Definition: ttg.h:197
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
virtual void fence_impl(void) override
Definition: ttg.h:93
const ttg::Edge & ctl_edge() const
Definition: ttg.h:97
virtual ~WorldImpl() override
Definition: ttg.h:85
WorldImpl(WorldImpl &&other)=delete
WorldImpl(::madness::World &world)
Definition: ttg.h:74
ttg::Edge & ctl_edge()
Definition: ttg.h:95
::madness::World & impl()
Definition: ttg.h:112
virtual void destroy(void) override
Definition: ttg.h:99
WorldImpl & operator=(WorldImpl &&other)=delete
WorldImpl(const SafeMPI::Intracomm &comm)
Definition: ttg.h:76
const ::madness::World & impl() const
Definition: ttg.h:114
#define TTGUNUSED(x)
Definition: macro.h:6
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
this contains MADNESS-based TTG functionality
Definition: fwd.h:16
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:147
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:123
void ttg_execute(ttg::World world)
Definition: ttg.h:141
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:165
void ttg_sum(ttg::World world, T &value)
Definition: ttg.h:168
void ttg_fence(ttg::World world)
Definition: ttg.h:144
void ttg_finalize()
Definition: ttg.h:131
void make_executable_hook(ttg::World &)
Definition: ttg.h:121
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:161
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:174
void ttg_abort()
Definition: ttg.h:137
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:156
ttg::World ttg_default_execution_context()
Definition: ttg.h:136
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:545
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
TaskCoroutineID
Definition: coroutine.h:222
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181