ttg.h
Go to the documentation of this file.
1 #ifndef MADNESS_TTG_H_INCLUDED
2 #define MADNESS_TTG_H_INCLUDED
3 
4 /* set up env if this header was included directly */
5 #if !defined(TTG_IMPL_NAME)
6 #define TTG_USE_MADNESS 1
7 #endif // !defined(TTG_IMPL_NAME)
8 
9 #include "ttg/impl_selector.h"
10 
11 #include "ttg/base/keymap.h"
12 #include "ttg/base/tt.h"
13 #include "ttg/func.h"
14 #include "ttg/madness/device.h"
15 #include "ttg/madness/devicefunc.h"
16 
17 /* needed for make_tt */
18 #include "ttg/device/task.h"
19 
20 #include "ttg/runtimes.h"
21 #include "ttg/tt.h"
22 #include "ttg/util/bug.h"
23 #include "ttg/util/env.h"
24 #include "ttg/util/hash.h"
25 #include "ttg/util/macro.h"
26 #include "ttg/util/meta.h"
27 #include "ttg/util/meta/callable.h"
28 #include "ttg/util/scope_exit.h"
29 #include "ttg/util/void.h"
30 #include "ttg/world.h"
31 #include "ttg/coroutine.h"
32 
33 /* include ttg header to make symbols available in case this header is included directly */
34 #include "../../ttg.h"
35 
36 #include <array>
37 #include <cassert>
38 #include <functional>
39 #include <future>
40 #include <iostream>
41 #include <map>
42 #include <memory>
43 #include <string>
44 #include <tuple>
45 #include <vector>
46 
47 #include <madness/world/MADworld.h>
48 #include <madness/world/world_object.h>
49 #include <madness/world/worldhashmap.h>
50 #include <madness/world/worldtypes.h>
51 
52 #include <madness/world/world_task_queue.h>
53 
54 namespace ttg_madness {
55 
56 #if 0
57  class Control;
58  class Graph;
60  class Graph {
61  public:
62  Graph() {
63  world_ = default_execution_context();
64  }
65  Graph(World& w) : world_(w) {}
66 
67 
68  private:
69  World& world_;
70  };
71 #endif
72 
73  class WorldImpl final : public ttg::base::WorldImplBase {
74  private:
75  ::madness::World &m_impl;
76  bool m_allocated = false;
77 
78  ttg::Edge<> m_ctl_edge;
79 
80  public:
81  WorldImpl(::madness::World &world) : WorldImplBase(world.size(), world.rank()), m_impl(world) {}
82 
83  WorldImpl(const SafeMPI::Intracomm &comm)
84  : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
85 
86  /* Deleted copy ctor */
87  WorldImpl(const WorldImpl &other) = delete;
88 
89  /* Deleted move ctor */
90  WorldImpl(WorldImpl &&other) = delete;
91 
92  virtual ~WorldImpl() override { destroy(); }
93 
94  /* Deleted copy assignment */
95  WorldImpl &operator=(const WorldImpl &other) = delete;
96 
97  /* Deleted move assignment */
98  WorldImpl &operator=(WorldImpl &&other) = delete;
99 
100  virtual void fence_impl(void) override { m_impl.gop.fence(); }
101 
102  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
103 
104  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
105 
106  virtual void destroy(void) override {
107  if (is_valid()) {
108  release_ops();
110  if (m_allocated) {
111  delete &m_impl;
112  m_allocated = false;
113  }
114  mark_invalid();
115  }
116  }
117 
118  /* Return an unmanaged reference to the implementation object */
119  ::madness::World &impl() { return m_impl; }
120 
121  const ::madness::World &impl() const { return m_impl; }
122 
123 #ifdef ENABLE_PARSEC
124  parsec_context_t *context() { return ::madness::ThreadPool::instance()->parsec; }
125 #endif
126  };
127 
128  inline void make_executable_hook(ttg::World& world) { }
129 
130  inline void ttg_initialize(int argc, char **argv, int num_threads) {
132  ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
133  auto *world_ptr = new ttg_madness::WorldImpl{madworld};
134  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
135  ttg::World world{std::move(world_sptr)};
136  ttg::detail::set_default_world(std::move(world));
137  }
138  inline void ttg_finalize() {
139  ttg::detail::set_default_world(ttg::World{}); // reset the default world
140  ttg::detail::destroy_worlds<ttg_madness::WorldImpl>();
142  }
144  inline void ttg_abort() {
145  MPI_Abort(ttg_default_execution_context().impl().impl().mpi.Get_mpi_comm(), 1);
146  assert(0); // make sure we abort
147  }
148  inline void ttg_execute(ttg::World world) {
149  // World executes tasks eagerly
150  }
151  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
152 
153  template <typename T>
154  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
155  world.impl().register_ptr(ptr);
156  }
157 
158  template <typename T>
159  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
160  world.impl().register_ptr(std::move(ptr));
161  }
162 
163  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
164  world.impl().register_status(status_ptr);
165  }
166 
167  template <typename Callback>
168  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
169  world.impl().register_callback(std::forward<Callback>(callback));
170  }
171 
172  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
173 
174  template <typename T>
175  inline void ttg_sum(ttg::World world, T &value) {
176  world.impl().impl().gop.sum(value);
177  }
180  template <typename T>
181  inline void ttg_broadcast(ttg::World world, T &data, int source_rank) {
182  world.impl().impl().gop.broadcast_serializable(data, source_rank);
183  }
184 
193  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
194  class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs, Space>> {
195  static_assert(Space == ttg::ExecutionSpace::Host, "MADNESS backend only supports Host Execution Space");
196  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
197  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
198  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
199  // create a virtual control input if the input list is empty, to be used in invoke()
200  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
202 
203  public:
204  using ttT = TT;
205  using key_type = keyT;
207  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
208 
209  private:
210  ttg::World world;
211  ttg::meta::detail::keymap_t<keyT> keymap;
212  ttg::meta::detail::keymap_t<keyT> priomap;
213  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
214  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
215  input_reducers;
216  int num_pullins = 0;
217 
218  std::array<std::size_t, std::tuple_size_v<actual_input_tuple_type>> static_streamsize;
219 
220  public:
221  ttg::World get_world() const override final { return world; }
222 
224  static constexpr bool derived_has_cuda_op() {
225  return false;
226  }
227 
229  static constexpr bool derived_has_hip_op() {
230  return false;
231  }
232 
234  static constexpr bool derived_has_level_zero_op() {
235  return false;
236  }
237 
239  static constexpr bool derived_has_device_op() {
240  return false;
241  }
242 
243  protected:
244  using worldobjT = ::madness::WorldObject<ttT>;
245 
246  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
247  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
248  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
249 
250  // This to support tt fusion
251  inline static __thread struct {
252  uint64_t key_hash = 0; // hash of current key
253  size_t call_depth = 0; // how deep calls are nested
255 
256  public:
259  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
260  static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
261  "at most one void input can be handled, and it must come last");
262  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
264  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
266  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
267 
268  using input_args_type = actual_input_tuple_type;
269 
270  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
271  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
272  static_assert(!ttg::meta::is_any_void_v<input_values_tuple_type>);
273 
274  using output_terminals_type = output_terminalsT;
276 
277  protected:
278  template <std::size_t i, typename resultT, typename InTuple>
279  static resultT get(InTuple &&intuple) {
280  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
281  };
282  template <std::size_t i, typename InTuple>
283  static auto &get(InTuple &&intuple) {
284  return std::get<i>(std::forward<InTuple>(intuple));
285  };
286 
287  private:
288  input_terminals_type input_terminals;
289  output_terminalsT output_terminals;
290 
291  protected:
292  const auto &get_output_terminals() const { return output_terminals; }
293 
294  private:
295  struct TTArgs : ::madness::TaskInterface {
296  private:
297  using TaskInterface = ::madness::TaskInterface;
298 
299  public:
300  int counter; // Tracks the number of arguments finalized
301  std::array<std::int64_t, numins>
302  nargs; // Tracks the number of expected values minus the number of received values
303  // 0 = finalized
304  // for a streaming input initialized to std::numeric_limits<std::int64_t>::max()
305  // which indicates that the value needs to be initialized
306  std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
307  // inputs (0 = unbounded stream, >0 = bounded stream)
308  input_values_tuple_type input_values; // The input values (does not include control)
309  derivedT *derived; // Pointer to derived class instance
310  bool pull_terminals_invoked = false;
311  std::conditional_t<ttg::meta::is_void_v<keyT>, ttg::Void, keyT> key; // Task key
312 #ifdef TTG_HAVE_COROUTINE
313  void *suspended_task_address = nullptr; // if not null the function is suspended
315 #endif // TTG_HAVE_COROUTINE
316 
318  template <typename Tuple, std::size_t... Is>
319  static input_refs_tuple_type make_input_refs_impl(Tuple &&inputs, std::index_sequence<Is...>) {
320  return input_refs_tuple_type{
321  get<Is, std::tuple_element_t<Is, input_refs_tuple_type>>(std::forward<Tuple>(inputs))...};
322  }
323 
325  input_refs_tuple_type make_input_refs() {
326  return make_input_refs_impl(this->input_values,
327  std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{});
328  }
329 
330  TTArgs(int prio = 0)
331  : TaskInterface(TaskAttributes(prio ? TaskAttributes::HIGHPRIORITY : 0))
332  , counter(numins)
333  , nargs()
334  , stream_size()
335  , input_values() {
336  std::fill(nargs.begin(), nargs.end(), std::numeric_limits<std::int64_t>::max());
337  }
338 
339  virtual void run(::madness::World &world) override {
340  using ttg::hash;
341  ttT::threaddata.key_hash = hash<decltype(key)>{}(key);
342  ttT::threaddata.call_depth++;
343 
344  void *suspended_task_address =
345 #ifdef TTG_HAVE_COROUTINE
346  this->suspended_task_address; // non-null = need to resume the task
347 #else // TTG_HAVE_COROUTINE
348  nullptr;
349 #endif // TTG_HAVE_COROUTINE
350  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
351  // ttg::print("starting task");
352  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
354  suspended_task_address,
355  coroutine_id,
356  derived->op(key, this->make_input_refs(),
357  derived->output_terminals)); // !!! NOTE converting input values to refs
358  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
359  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
360  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
362  suspended_task_address,
363  coroutine_id,
364  derived->op(this->make_input_refs(),
365  derived->output_terminals)); // !!! NOTE converting input values to refs
366  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
367  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
368  } else // unreachable
369  ttg::abort();
370  } else { // resume suspended coroutine
371 #ifdef TTG_HAVE_COROUTINE
372  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
373  assert(ret.ready());
374  ret.resume();
375  if (ret.completed()) {
376  ret.destroy();
377  suspended_task_address = nullptr;
378  } else { // not yet completed
379  // leave suspended_task_address as is
380  }
381  this->suspended_task_address = suspended_task_address;
382 #else // TTG_HAVE_COROUTINE
383  ttg::abort(); // should not happen
384 #endif // TTG_HAVE_COROUTINE
385  }
386 
387  ttT::threaddata.call_depth--;
388 
389  // if (suspended_task_address == nullptr) {
390  // ttg::print("finishing task",ttT::threaddata.call_depth);
391  // }
392 
393 #ifdef TTG_HAVE_COROUTINE
394  if (suspended_task_address) {
395  // TODO implement handling of suspended coroutines properly
396 
397  // only resumable_task is recognized at the moment
398  assert(coroutine_id == ttg::TaskCoroutineID::ResumableTask);
399 
400  // right now can events are not properly implemented, we are only testing the workflow with dummy events
401  // so mark the events finished manually
402  // proper thing to do is to process event queue and resubmit this task again
403  auto events =
404  static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
405  for (auto &event_ptr : events) {
406  event_ptr->finish();
407  }
408  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
409 
410  // resume the coroutine
411  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
412  assert(ret.ready());
413  ret.resume();
414  if (ret.completed()) {
415  ret.destroy();
416  suspended_task_address = nullptr;
417  } else { // not yet completed
418  ttg::abort();
419  }
420  }
421 #endif // TTG_HAVE_COROUTINE
422  }
423 
424  virtual ~TTArgs() {} // Will be deleted via TaskInterface*
425 
426  private:
427  ::madness::Spinlock lock_; // synchronizes access to data
428  public:
429  void lock() { lock_.lock(); }
430  void unlock() { lock_.unlock(); }
431  };
432 
433  using hashable_keyT = std::conditional_t<ttg::meta::is_void_v<keyT>, int, keyT>;
434  using cacheT = ::madness::ConcurrentHashMap<hashable_keyT, TTArgs *, ttg::hash<hashable_keyT>>;
435  using accessorT = typename cacheT::accessor;
436  cacheT cache;
437 
438  protected:
439  template <typename terminalT, std::size_t i, typename Key>
440  void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args) {
441  if (in.is_pull_terminal) {
442  int owner;
443  if constexpr (!ttg::meta::is_void_v<Key>) {
444  owner = in.container.owner(key);
445  } else {
446  owner = in.container.owner();
447  }
448 
449  if (owner != world.rank()) {
450  get_terminal_data<i, Key>(owner, key);
451  } else {
452  if constexpr (!ttg::meta::is_void_v<Key>) {
453  auto value = (in.container).get(key);
454  if (args->nargs[i] == 0) {
455  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
456  ": error argument is already finalized : ", i);
457  throw std::runtime_error("Op::set_arg called for a finalized stream");
458  }
459 
460  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
461  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
462  args->nargs[i] = 0;
463  args->counter--;
464  }
465  } else {
466  auto value = (in.container).get();
467  if (args->nargs[i] == 0) {
468  ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
469  ": error argument is already finalized : ", i);
470  throw std::runtime_error("Op::set_arg called for a finalized stream");
471  }
472 
473  if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
474  this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
475  args->nargs[i] = 0;
476  args->counter--;
477  }
478  }
479  }
480  }
481  }
482 
483  template <std::size_t i, typename Key>
484  void get_terminal_data(const int owner, const Key &key) {
485  if (owner != world.rank()) {
486  worldobjT::send(owner, &ttT::template get_terminal_data<i, Key>, owner, key);
487  } else {
488  auto &in = std::get<i>(input_terminals);
489  if constexpr (!ttg::meta::is_void_v<Key>) {
490  auto value = (in.container).get(key);
491  worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
492  key, value);
493  } else {
494  auto value = (in.container).get();
495  worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
496  value);
497  }
498  }
499  }
500 
501  template <std::size_t... IS, typename Key = keyT>
502  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, TTArgs *args) {
503  int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
504  std::get<IS>(input_terminals), key, args),
505  0)...};
506  junk[0]++;
507  }
508 
509  // there are 6 types of set_arg:
510  // - case 1: nonvoid Key, complete Value type
511  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
512  // - case 3: nonvoid Key, void Value, no inputs
513  // - case 4: void Key, complete Value type
514  // - case 5: void Key, void Value, mixed (data+control) inputs
515  // - case 6: void Key, void Value, no inputs
516  // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
517 
518  // case 1:
519  template <std::size_t i, typename Key, typename Value>
520  void set_arg(const Key &key, Value &&value) {
521  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>; // Should be T or const T
522  static_assert(std::is_same_v<std::decay_t<Value>, std::decay_t<valueT>>,
523  "TT::set_arg(key,value) given value of type incompatible with TT");
524 
525  int owner;
526  if constexpr (!ttg::meta::is_void_v<Key>) {
527  owner = keymap(key);
528  } else {
529  owner = keymap();
530  }
531 
532  if (owner != world.rank()) {
533  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding setting argument : ", i);
534  // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
535  // BUT compiler vomits when const std::remove_reference_t<Value>& -> std::decay_t<Value>
536  // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
537  // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
538  // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
539  // move arguments) and locally
540  // here we know that this will be a remove execution, so we prepare to take rvalues;
541  // send_am will need to separate local and remote paths to deal with this
542  if constexpr (!ttg::meta::is_void_v<Key>) {
543  if constexpr (!ttg::meta::is_void_v<Value>) {
544  worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
545  } else {
546  worldobjT::send(owner, &ttT::template set_arg<i, Key, void>, key);
547  }
548  } else {
549  if constexpr (!ttg::meta::is_void_v<Value>) {
550  worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
551  } else {
552  worldobjT::send(owner, &ttT::template set_arg<i, void, void>);
553  }
554  }
555  } else {
556  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
557 
558  bool pullT_invoked = false;
559  accessorT acc;
560 
561  int prio;
562  if constexpr (!ttg::meta::is_void_v<Key>) {
563  prio = this->priomap(key);
564  if (cache.insert(acc, key)) {
565  acc->second = new TTArgs(prio); // It will be deleted by the task q
566  if (!is_lazy_pull()) {
567  // Invoke pull terminals for only the terminals with non-void values.
568  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
569  acc->second);
570  pullT_invoked = true;
571  }
572  }
573  } else {
574  prio = this->priomap();
575  if (cache.insert(acc, 0)) acc->second = new TTArgs(prio); // It will be deleted by the task q
576  }
577 
578  TTArgs *args = acc->second;
579  if (!is_lazy_pull() && pullT_invoked) args->pull_terminals_invoked = true;
580 
581  if (args->nargs[i] == 0) {
582  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
583  throw std::runtime_error("TT::set_arg called for a finalized stream");
584  }
585 
586  const auto &reducer = std::get<i>(input_reducers);
587  if (reducer) { // is this a streaming input? reduce the received value
588  // N.B. Right now reductions are done eagerly, without spawning tasks
589  // this means we must lock
590  args->lock();
591 
592  bool initialize_not_reduce = false;
593  if (args->nargs[i] == std::numeric_limits<std::int64_t>::max()) {
594  // upon first datum initialize, if needed
595  if constexpr (!ttg::meta::is_void_v<valueT>) {
596  initialize_not_reduce = true;
597  }
598 
599  // initialize nargs
600  // if we have a stream size for the op, use it first
601  if (args->stream_size[i] != 0) {
602  assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
603  args->nargs[i] = args->stream_size[i];
604  } else if (static_streamsize[i] != 0) {
605  assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
606  args->stream_size[i] = static_streamsize[i];
607  args->nargs[i] = static_streamsize[i];
608  } else {
609  args->nargs[i] = 0;
610  }
611  }
612 
613  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
614  if (initialize_not_reduce)
615  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
616  else
617  reducer(this->get<i, std::decay_t<valueT> &>(args->input_values), value);
618  } else {
619  reducer(); // even if this was a control input, must execute the reducer for possible side effects
620  }
621 
622  // update the counter
623  args->nargs[i]--;
624 
625  // is this the last message?
626  if (args->nargs[i] == 0) args->counter--;
627 
628  args->unlock();
629  } else { // this is a nonstreaming input => set the value
630  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
631  this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
632  }
633  args->nargs[i] = 0;
634  args->counter--;
635  }
636 
637  // If lazy pulling in enabled, check it here.
638  if (numins - args->counter == num_pullins) {
639  if (is_lazy_pull() && !args->pull_terminals_invoked) {
640  // Invoke pull terminals for only the terminals with non-void values.
641  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
642  }
643  }
644 
645  // ready to run the task?
646  if (args->counter == 0) {
647  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
648  args->derived = static_cast<derivedT *>(this);
649  args->key = key;
650 
651  using ttg::hash;
652  auto curhash = hash<keyT>{}(key);
653 
654  if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
655 
656  // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
657  ttT::threaddata.call_depth++;
658  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
659  static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
660  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
661  static_cast<derivedT *>(this)->op(key, output_terminals); // Runs immediately
662  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
663  static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
664  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
665  static_cast<derivedT *>(this)->op(output_terminals); // Runs immediately
666  } else
667  ttg::abort();
668  ttT::threaddata.call_depth--;
669 
670  } else {
671  // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
672  world.impl().impl().taskq.add(args);
673  }
674 
675  cache.erase(acc);
676  }
677  }
678  }
679 
680  // case 2 and 3
681  template <std::size_t i, typename Key, typename Value>
682  std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
683  set_arg<i>(key, ttg::Void{});
684  }
685 
686  // case 4
687  template <std::size_t i, typename Key = keyT, typename Value>
688  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
689  return set_arg<i>(ttg::Void{}, std::forward<Value>(value));
690  }
691 
692  // case 5 and 6
693  template <std::size_t i, typename Key = keyT, typename Value>
694  std::enable_if_t<ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg() {
695  set_arg<i, ttg::Void, ttg::Void>(ttg::Void{}, ttg::Void{});
696  }
697 
698  // Used by invoke to set all arguments associated with a task
699  // Is: index sequence of elements in args
700  // Js: index sequence of input terminals to set
701  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
702  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
703  const Key &key, const std::tuple<Ts...> &args) {
704  static_assert(sizeof...(Js) == sizeof...(Is));
705  constexpr std::size_t js[] = {Js...};
706  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
707  junk[0]++;
708  }
709 
710  // Used by invoke to set all arguments associated with a task
711  // Is: index sequence of input terminals to set
712  template <typename Key, typename... Ts, size_t... Is>
713  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
714  const std::tuple<Ts...> &args) {
715  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
716  }
717 
718  // Used by invoke to set all arguments associated with a task
719  // Is: index sequence of elements in args
720  // Js: index sequence of input terminals to set
721  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
722  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
723  const std::tuple<Ts...> &args) {
724  static_assert(sizeof...(Js) == sizeof...(Is));
725  constexpr std::size_t js[] = {Js...};
726  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
727  junk[0]++;
728  }
729 
730  // Used by invoke to set all arguments associated with a task
731  // Is: index sequence of input terminals to set
732  template <typename Key = keyT, typename... Ts, size_t... Is>
733  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
734  const std::tuple<Ts...> &args) {
735  set_args(std::index_sequence_for<Ts...>{}, is, args);
736  }
737 
738  public:
741  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
742  std::enable_if_t<key_is_void, void> set_argstream_size(std::size_t size) {
743  // preconditions
744  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
745  assert(size > 0 && "TT::set_argstream_size(size) called with size=0");
746 
747  // body
748  const auto owner = keymap();
749  if (owner != world.rank()) {
750  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
751  worldobjT::send(owner, &ttT::template set_argstream_size<i, true>, size);
752  } else {
753  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
754 
755  accessorT acc;
756  if (cache.insert(acc, 0)) acc->second = new TTArgs(); // It will be deleted by the task q
757  TTArgs *args = acc->second;
758 
759  args->lock();
760 
761  // check if stream is already bounded
762  if (args->stream_size[i] > 0) {
763  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
764  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
765  }
766 
767  // check if stream is already finalized
768  if (args->nargs[i] == 0) {
769  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
770  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
771  }
772 
773  // commit changes
774  args->stream_size[i] = size;
775  // if messages already received for this key update the expected-received counter
776  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
777  if (messages_received_already) {
778  // cannot have received more messages than expected
779  if (-(args->nargs[i]) > size) {
780  ttg::print_error(world.rank(), ":", get_name(),
781  " : error stream received more messages than specified via set_argstream_size : ", i);
782  throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
783  }
784  args->nargs[i] += size;
785  }
786  // if done, update the counter
787  if (args->nargs[i] == 0) args->counter--;
788  args->unlock();
789 
790  // ready to run the task?
791  if (args->counter == 0) {
792  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
793  args->derived = static_cast<derivedT *>(this);
794 
795  world.impl().impl().taskq.add(args);
796 
797  cache.erase(acc);
798  }
799  }
800  }
801 
802  template <std::size_t i>
803  void set_static_argstream_size(std::size_t size) {
804  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
805  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
806 
807  ttg::trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
808 
809  // Check if stream is already bounded
810  if (static_streamsize[i] > 0) {
811  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
812  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
813  }
814 
815  // commit changes
816  static_streamsize[i] = size;
817  }
818 
823  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
824  std::enable_if_t<!key_is_void, void> set_argstream_size(const Key &key, std::size_t size) {
825  // preconditions
826  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
827  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
828 
829  // body
830  const auto owner = keymap(key);
831  if (owner != world.rank()) {
832  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
833  worldobjT::send(owner, &ttT::template set_argstream_size<i>, key, size);
834  } else {
835  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
836 
837  accessorT acc;
838  if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
839  TTArgs *args = acc->second;
840 
841  args->lock();
842 
843  // check if stream is already bounded
844  if (args->stream_size[i] > 0) {
845  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
846  throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
847  }
848 
849  // check if stream is already finalized
850  if (args->nargs[i] == 0) {
851  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
852  throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
853  }
854 
855  // commit changes
856  args->stream_size[i] = size;
857  // if messages already received for this key update the expected-received counter
858  const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
859  if (messages_received_already) args->nargs[i] += size;
860  // if done, update the counter
861  if (args->nargs[i] == 0) args->counter--;
862 
863  args->unlock();
864 
865  // ready to run the task?
866  if (args->counter == 0) {
867  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
868  args->derived = static_cast<derivedT *>(this);
869  args->key = key;
870 
871  world.impl().impl().taskq.add(args);
872 
873  cache.erase(acc);
874  }
875  }
876  }
877 
879  template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
880  std::enable_if_t<!key_is_void, void> finalize_argstream(const Key &key) {
881  // preconditions
882  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
883 
884  // body
885  const auto owner = keymap(key);
886  if (owner != world.rank()) {
887  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
888  worldobjT::send(owner, &ttT::template finalize_argstream<i>, key);
889  } else {
890  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
891 
892  accessorT acc;
893  const auto found = cache.find(acc, key);
894  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
895  TTGUNUSED(found);
896  TTArgs *args = acc->second;
897 
898  // check if stream is already bounded
899  if (args->stream_size[i] > 0) {
900  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
901  throw std::runtime_error("TT::finalize called for a bounded stream");
902  }
903 
904  // check if stream is already finalized
905  if (args->nargs[i] == 0) {
906  ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
907  throw std::runtime_error("TT::finalize called for a finalized stream");
908  }
909 
910  // commit changes
911  args->nargs[i] = 0;
912  args->counter--;
913  // ready to run the task?
914  if (args->counter == 0) {
915  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
916  args->derived = static_cast<derivedT *>(this);
917  args->key = key;
918 
919  world.impl().impl().taskq.add(args);
920  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
921 
922  cache.erase(acc);
923  }
924  }
925  }
926 
928  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
929  std::enable_if_t<key_is_void, void> finalize_argstream() {
930  // preconditions
931  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
932 
933  // body
934  const int owner = keymap();
935  if (owner != world.rank()) {
936  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream finalize for terminal ", i);
937  worldobjT::send(owner, &ttT::template finalize_argstream<i, true>);
938  } else {
939  ttg::trace(world.rank(), ":", get_name(), " : finalizing stream for terminal ", i);
940 
941  accessorT acc;
942  const auto found = cache.find(acc, 0);
943  assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
944  TTGUNUSED(found);
945  TTArgs *args = acc->second;
946 
947  // check if stream is already bounded
948  if (args->stream_size[i] > 0) {
949  ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
950  throw std::runtime_error("TT::finalize called for a bounded stream");
951  }
952 
953  // check if stream is already finalized
954  if (args->nargs[i] == 0) {
955  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
956  throw std::runtime_error("TT::finalize called for a finalized stream");
957  }
958 
959  // commit changes
960  args->nargs[i] = 0;
961  args->counter--;
962  // ready to run the task?
963  if (args->counter == 0) {
964  ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
965  args->derived = static_cast<derivedT *>(this);
966 
967  world.impl().impl().taskq.add(args);
968  // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
969 
970  cache.erase(acc);
971  }
972  }
973  }
974 
975  private:
976  // Copy/assign/move forbidden ... we could make it work using
977  // PIMPL for this base class. However, this instance of the base
978  // class is tied to a specific instance of a derived class a
979  // pointer to which is captured for invoking derived class
980  // functions. Thus, not only does the derived class have to be
981  // involved but we would have to do it in a thread safe way
982  // including for possibly already running tasks and remote
983  // references. This is not worth the effort ... wherever you are
984  // wanting to move/assign an TT you should be using a pointer.
985  TT(const TT &other) = delete;
986  TT &operator=(const TT &other) = delete;
987  TT(TT &&other) = delete;
988  TT &operator=(TT &&other) = delete;
989 
990  // Registers the callback for the i'th input terminal
991  template <typename terminalT, std::size_t i>
992  void register_input_callback(terminalT &input) {
993  static_assert(std::is_same_v<keyT, typename terminalT::key_type>,
994  "TT::register_input_callback(terminalT) -- incompatible terminalT");
995  using valueT = std::decay_t<typename terminalT::value_type>;
996 
997  if (input.is_pull_terminal) {
998  num_pullins++;
999  }
1000 
1002  // case 1: nonvoid key, nonvoid value
1004  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1005  !std::is_void_v<valueT>) {
1006  auto move_callback = [this](const keyT &key, valueT &&value) {
1007  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
1008  };
1009  auto send_callback = [this](const keyT &key, const valueT &value) {
1010  set_arg<i, keyT, const valueT &>(key, value);
1011  };
1012  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1013  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1014  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1015  }
1017  // case 4: void key, nonvoid value
1019  else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1020  !std::is_void_v<valueT>) {
1021  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1022  auto send_callback = [this](const valueT &value) { set_arg<i, keyT, const valueT &>(value); };
1023  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1024  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1025  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1026  }
1028  // case 2: nonvoid key, void value, mixed inputs
1029  // case 3: nonvoid key, void value, no inputs
1031  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
1032  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, void>(key); };
1033  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1034  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1035  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1036  }
1038  // case 5: void key, void value, mixed inputs
1039  // case 6: void key, void value, no inputs
1041  else if constexpr (ttg::meta::is_all_void_v<keyT, valueT> && std::is_void_v<valueT>) {
1042  auto send_callback = [this]() { set_arg<i, keyT, void>(); };
1043  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1044  auto finalize_callback = [this]() { finalize_argstream<i>(); };
1045  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1046  } else
1047  ttg::abort();
1048  }
1049 
1050  template <std::size_t... IS>
1051  void register_input_callbacks(std::index_sequence<IS...>) {
1052  int junk[] = {
1053  0,
1054  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1055  0)...};
1056  junk[0]++;
1057  }
1058 
1059  template <std::size_t... IS, typename inedgesT>
1060  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1061  static_assert(sizeof...(IS) == std::tuple_size_v<input_terminals_type>);
1062  static_assert(std::tuple_size_v<inedgesT> == std::tuple_size_v<input_terminals_type>);
1063  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
1064  junk[0]++;
1065  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1066  " Edges");
1067  }
1068 
1069  template <std::size_t... IS, typename outedgesT>
1070  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1071  static_assert(sizeof...(IS) == numouts);
1072  static_assert(std::tuple_size_v<outedgesT> == numouts);
1073  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
1074  junk[0]++;
1075  ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1076  " Edges");
1077  }
1078 
1079  public:
1080  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1081  typename priomapT = ttg::detail::default_priomap<keyT>>
1082  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1083  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1084  : ttg::TTBase(name, numinedges, numouts)
1085  , static_streamsize()
1086  , worldobjT(world.impl().impl())
1087  , world(world)
1088  // if using default keymap, rebind to the given world
1089  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1090  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1091  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1092  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1093  // Cannot call these in base constructor since terminals not yet constructed
1094  if (innames.size() != numinedges) {
1095  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1096  numinedges);
1097  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1098  }
1099  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1100 
1101  register_input_terminals(input_terminals, innames);
1102  register_output_terminals(output_terminals, outnames);
1103 
1104  register_input_callbacks(std::make_index_sequence<numinedges>{});
1105  }
1106 
1107  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1108  typename priomapT = ttg::detail::default_priomap<keyT>>
1109  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1110  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1111  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
1112  std::forward<priomapT>(priomap)) {}
1113 
1114  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1115  typename priomapT = ttg::detail::default_priomap<keyT>>
1116  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1117  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1118  keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
1119  : ttg::TTBase(name, numinedges, numouts)
1120  , static_streamsize()
1121  , worldobjT(ttg::default_execution_context().impl().impl())
1122  , world(ttg::default_execution_context())
1123  // if using default keymap, rebind to the given world
1124  , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1125  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1126  : decltype(keymap)(std::forward<keymapT>(keymap_)))
1127  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1128  // Cannot call in base constructor since terminals not yet constructed
1129  if (innames.size() != numinedges) {
1130  ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1131  numinedges);
1132  throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1133  }
1134  if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1135 
1136  register_input_terminals(input_terminals, innames);
1137  register_output_terminals(output_terminals, outnames);
1138 
1139  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
1140  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
1141  // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1142  register_input_callbacks(std::make_index_sequence<numinedges>{});
1143  }
1144 
1145  template <typename keymapT = ttg::detail::default_keymap<keyT>,
1146  typename priomapT = ttg::detail::default_priomap<keyT>>
1147  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1148  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1149  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1150  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
1151  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
1152 
1153  // Destructor checks for unexecuted tasks
1154  virtual ~TT() {
1155  if (cache.size() != 0) {
1156  std::cerr << world.rank() << ":"
1157  << "warning: unprocessed tasks in destructor of operation '" << get_name()
1158  << "' (class name = " << get_class_name() << ")" << std::endl;
1159  std::cerr << world.rank() << ":"
1160  << " T => argument assigned F => argument unassigned" << std::endl;
1161  int nprint = 0;
1162  for (auto item : cache) {
1163  if (nprint++ > 10) {
1164  std::cerr << " etc." << std::endl;
1165  break;
1166  }
1167  using ::madness::operators::operator<<;
1168  std::cerr << world.rank() << ":"
1169  << " unused: " << item.first << " : ( ";
1170  for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1171  std::cerr << ")" << std::endl;
1172  }
1173  ttg::abort();
1174  }
1175  }
1176 
1182  template <std::size_t i, typename Reducer>
1183  void set_input_reducer(Reducer &&reducer) {
1184  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
1185  std::get<i>(input_reducers) = reducer;
1186  }
1187 
1195  template <std::size_t i, typename Reducer>
1196  void set_input_reducer(Reducer &&reducer, std::size_t size) {
1197  set_input_reducer<i>(std::forward<Reducer>(reducer));
1198  set_static_argstream_size<i>(size);
1199  }
1200 
1201  template <typename Keymap>
1202  void set_keymap(Keymap &&km) {
1203  keymap = km;
1204  }
1205 
1206  auto get_priomap(void) const { return priomap; }
1207 
1211  template <typename Priomap>
1212  void set_priomap(Priomap &&pm) {
1213  priomap = std::forward<Priomap>(pm);
1214  }
1215 
1218  template<typename Constraint>
1219  void add_constraint(Constraint&& c) {
1220  /* currently a noop */
1221  }
1222 
1223  template<typename Constraint, typename Mapper>
1224  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
1225  /* currently a noop */
1226  }
1227 
1228  template<typename Constraint, typename Mapper>
1229  void add_constraint(Constraint c, Mapper&& map) {
1230  /* currently a noop */
1231  }
1232 
1234  void make_executable() override {
1235  TTBase::make_executable();
1236  this->process_pending();
1237  }
1238 
1240 
1245  void fence() override { ttg_fence(world); }
1246 
1248  template <std::size_t i>
1249  std::tuple_element_t<i, input_terminals_type> *in() {
1250  return &std::get<i>(input_terminals);
1251  }
1252 
1254  template <std::size_t i>
1255  std::tuple_element_t<i, output_terminalsT> *out() {
1256  return &std::get<i>(output_terminals);
1257  }
1258 
1260  template <typename Key = keyT>
1261  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1262  const Key &key, const input_values_tuple_type &args) {
1264  if constexpr(!std::is_same_v<Key, key_type>) {
1265  key_type k = key; /* cast that type into the key type we know */
1266  invoke(k, args);
1267  } else {
1268  /* trigger non-void inputs */
1269  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
1270  /* trigger void inputs */
1271  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1272  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1273  }
1274  }
1275 
1277  template <typename Key = keyT>
1278  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1279  const input_values_tuple_type &args) {
1281  /* trigger non-void inputs */
1282  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
1283  /* trigger void inputs */
1284  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1285  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1286  }
1287 
1289  template <typename Key = keyT>
1290  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1291  const Key &key) {
1293  if constexpr(!std::is_same_v<Key, key_type>) {
1294  key_type k = key; /* cast that type into the key type we know */
1295  invoke(k);
1296  } else {
1297  /* trigger void inputs */
1298  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1299  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1300  }
1301  }
1302 
1304  template <typename Key = keyT>
1305  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1307  /* trigger void inputs */
1308  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
1309  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1310  }
1311 
1312  void invoke() override {
1313  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
1314  invoke<keyT>();
1315  else
1316  TTBase::invoke();
1317  }
1318 
1319  void set_defer_writer(bool _) {}
1320 
1321  bool get_defer_writer(bool _) { return false; }
1322 
1325  const decltype(keymap) &get_keymap() const { return keymap; }
1326 
1330  template <typename Key>
1331  std::enable_if_t<!ttg::meta::is_void_v<Key>, int> owner(const Key &key) const {
1332  return keymap(key);
1333  }
1334 
1337  template <typename Key>
1338  std::enable_if_t<ttg::meta::is_void_v<Key>, int> owner() const {
1339  return keymap();
1340  }
1341  };
1342 
1343 #include "ttg/make_tt.h"
1344 
1345 } // namespace ttg_madness
1346 
1347 #include "ttg/madness/watch.h"
1348 #include "ttg/madness/buffer.h"
1349 #include "ttg/madness/ttvalue.h"
1350 
1351 #endif // MADNESS_TTG_H_INCLUDED
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:277
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:31
std::string get_class_name() const
Gets the demangled class name (uses RTTI)
Definition: tt.h:221
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:218
TTBase(TTBase &&other)
Definition: tt.h:116
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:85
bool is_lazy_pull()
Definition: tt.h:200
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:92
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
void set_keymap(Keymap &&km)
Definition: ttg.h:1202
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:257
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:713
actual_input_tuple_type input_args_type
Definition: ttg.h:268
void set_priomap(Priomap &&pm)
Definition: ttg.h:1212
void set_defer_writer(bool _)
Definition: ttg.h:1319
void set_arg(const Key &key, Value &&value)
Definition: ttg.h:520
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1245
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1196
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:1229
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:688
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1082
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:258
static resultT get(InTuple &&intuple)
Definition: ttg.h:279
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:1224
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:733
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:224
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:742
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1278
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1305
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:234
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1116
static constexpr int numouts
Definition: ttg.h:248
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1147
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:484
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:266
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:1183
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:929
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:824
static auto & get(InTuple &&intuple)
Definition: ttg.h:283
virtual ~TT()
Definition: ttg.h:1154
size_t call_depth
Definition: ttg.h:253
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1331
static constexpr int numinedges
Definition: ttg.h:246
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:264
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:694
output_terminalsT output_terminals_type
Definition: ttg.h:274
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:270
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1338
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:275
static constexpr bool derived_has_hip_op()
Definition: ttg.h:229
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:271
uint64_t key_hash
Definition: ttg.h:252
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1255
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:682
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1109
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1261
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:502
void invoke() override
Definition: ttg.h:1312
decltype(keymap) const & get_keymap() const
Definition: ttg.h:1325
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:803
void make_executable() override
implementation of TTBase::make_executable()
Definition: ttg.h:1234
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:880
keyT key_type
Definition: ttg.h:205
static constexpr bool derived_has_device_op()
Definition: ttg.h:239
::madness::WorldObject< ttT > worldobjT
Definition: ttg.h:244
auto get_priomap(void) const
Definition: ttg.h:1206
void add_constraint(Constraint &&c)
Definition: ttg.h:1219
ttg::World get_world() const override final
Definition: ttg.h:221
static __thread struct ttg_madness::TT::@0 threaddata
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1290
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:440
static constexpr int numins
Definition: ttg.h:247
bool get_defer_writer(bool _)
Definition: ttg.h:1321
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:722
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1249
const auto & get_output_terminals() const
Definition: ttg.h:292
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:702
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
virtual void fence_impl(void) override
Definition: ttg.h:100
const ttg::Edge & ctl_edge() const
Definition: ttg.h:104
virtual ~WorldImpl() override
Definition: ttg.h:92
WorldImpl(WorldImpl &&other)=delete
WorldImpl(::madness::World &world)
Definition: ttg.h:81
ttg::Edge & ctl_edge()
Definition: ttg.h:102
::madness::World & impl()
Definition: ttg.h:119
virtual void destroy(void) override
Definition: ttg.h:106
WorldImpl & operator=(WorldImpl &&other)=delete
WorldImpl(const SafeMPI::Intracomm &comm)
Definition: ttg.h:83
const ::madness::World & impl() const
Definition: ttg.h:121
#define TTGUNUSED(x)
Definition: macro.h:6
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
this contains MADNESS-based TTG functionality
Definition: fwd.h:16
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:154
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:130
void ttg_execute(ttg::World world)
Definition: ttg.h:148
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:172
void ttg_sum(ttg::World world, T &value)
Definition: ttg.h:175
void ttg_fence(ttg::World world)
Definition: ttg.h:151
void ttg_finalize()
Definition: ttg.h:138
void make_executable_hook(ttg::World &)
Definition: ttg.h:128
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:168
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:181
void ttg_abort()
Definition: ttg.h:144
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:163
ttg::World ttg_default_execution_context()
Definition: ttg.h:143
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:543
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
TaskCoroutineID
Definition: coroutine.h:222
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181