ttg 1.0.0
Template Task Graph (TTG): flowgraph-based programming model for high-performance distributed-memory algorithms
Loading...
Searching...
No Matches
ttg.h
Go to the documentation of this file.
1// SPDX-License-Identifier: BSD-3-Clause
2#ifndef MADNESS_TTG_H_INCLUDED
3#define MADNESS_TTG_H_INCLUDED
4
5/* set up env if this header was included directly */
6#if !defined(TTG_IMPL_NAME)
7#define TTG_USE_MADNESS 1
8#endif // !defined(TTG_IMPL_NAME)
9
10#include "ttg/impl_selector.h"
11
12#include "ttg/base/keymap.h"
13#include "ttg/base/tt.h"
14#include "ttg/func.h"
15#include "ttg/madness/device.h"
17
18/* needed for make_tt */
19#include "ttg/device/task.h"
20
21#include "ttg/runtimes.h"
22#include "ttg/tt.h"
23#include "ttg/util/bug.h"
24#include "ttg/util/env.h"
25#include "ttg/util/hash.h"
26#include "ttg/util/macro.h"
27#include "ttg/util/meta.h"
29#include "ttg/util/scope_exit.h"
30#include "ttg/util/void.h"
31#include "ttg/world.h"
32#include "ttg/coroutine.h"
33
34/* include ttg header to make symbols available in case this header is included directly */
35#include "../../ttg.h"
36
37#include <array>
38#include <cassert>
39#include <functional>
40#include <future>
41#include <iostream>
42#include <map>
43#include <memory>
44#include <string>
45#include <tuple>
46#include <vector>
47
48#include <madness/world/MADworld.h>
49#include <madness/world/world_object.h>
50#include <madness/world/worldhashmap.h>
51#include <madness/world/worldtypes.h>
52
53#include <madness/world/world_task_queue.h>
54
55namespace ttg_madness {
56
57#if 0
58 class Control;
59 class Graph;
61 class Graph {
62 public:
63 Graph() {
64 world_ = default_execution_context();
65 }
66 Graph(World& w) : world_(w) {}
67
68
69 private:
70 World& world_;
71 };
72#endif
73
74 class WorldImpl final : public ttg::base::WorldImplBase {
75 private:
76 ::madness::World &m_impl;
77 bool m_allocated = false;
78
79 ttg::Edge<> m_ctl_edge;
80
81 public:
82 WorldImpl(::madness::World &world) : WorldImplBase(world.size(), world.rank()), m_impl(world) {}
83
84 WorldImpl(const SafeMPI::Intracomm &comm)
85 : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
86
87 /* Deleted copy ctor */
88 WorldImpl(const WorldImpl &other) = delete;
89
90 /* Deleted move ctor */
91 WorldImpl(WorldImpl &&other) = delete;
92
93 virtual ~WorldImpl() override { destroy(); }
94
95 /* Deleted copy assignment */
96 WorldImpl &operator=(const WorldImpl &other) = delete;
97
98 /* Deleted move assignment */
99 WorldImpl &operator=(WorldImpl &&other) = delete;
100
101 virtual void fence_impl(void) override { m_impl.gop.fence(); }
102
103 ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
104
105 const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
106
107 virtual void destroy(void) override {
108 if (is_valid()) {
109 release_ops();
111 if (m_allocated) {
112 delete &m_impl;
113 m_allocated = false;
114 }
115 mark_invalid();
116 }
117 }
118
119 /* Return an unmanaged reference to the implementation object */
120 ::madness::World &impl() { return m_impl; }
121
122 const ::madness::World &impl() const { return m_impl; }
123
124#ifdef ENABLE_PARSEC
125 parsec_context_t *context() { return ::madness::ThreadPool::instance()->parsec; }
126#endif
127 };
128
129 inline void make_executable_hook(ttg::World& world) { }
130
131 inline void ttg_initialize(int argc, char **argv, int num_threads) {
132 if (num_threads < 1) num_threads = ttg::detail::num_threads();
133 ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
134 auto *world_ptr = new ttg_madness::WorldImpl{madworld};
135 std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
136 ttg::World world{std::move(world_sptr)};
137 ttg::detail::set_default_world(std::move(world));
138 }
139 inline void ttg_finalize() {
140 ttg::detail::set_default_world(ttg::World{}); // reset the default world
141 ttg::detail::destroy_worlds<ttg_madness::WorldImpl>();
142 ::madness::finalize();
143 }
145 inline void ttg_abort() {
146 MPI_Abort(ttg_default_execution_context().impl().impl().mpi.Get_mpi_comm(), 1);
147 assert(0); // make sure we abort
148 }
149 inline void ttg_execute(ttg::World world) {
150 // World executes tasks eagerly
151 }
152 inline void ttg_fence(ttg::World world) { world.impl().fence(); }
153
154 template <typename T>
155 inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
156 world.impl().register_ptr(ptr);
157 }
158
159 template <typename T>
160 inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
161 world.impl().register_ptr(std::move(ptr));
162 }
163
164 inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
165 world.impl().register_status(status_ptr);
166 }
167
168 template <typename Callback>
169 inline void ttg_register_callback(ttg::World world, Callback &&callback) {
170 world.impl().register_callback(std::forward<Callback>(callback));
171 }
172
173 inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
174
175 template <typename T>
176 inline void ttg_sum(ttg::World world, T &value) {
177 world.impl().impl().gop.sum(value);
178 }
181 template <typename T>
182 inline void ttg_broadcast(ttg::World world, T &data, int source_rank) {
183 world.impl().impl().gop.broadcast_serializable(data, source_rank);
184 }
185
194 template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
195 class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs, Space>> {
196 static_assert(Space == ttg::ExecutionSpace::Host, "MADNESS backend only supports Host Execution Space");
198 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
199 using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
200 // create a virtual control input if the input list is empty, to be used in invoke()
201 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
203
204 public:
205 using ttT = TT;
206 using key_type = keyT;
208 static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
209
210 private:
211 ttg::World world;
212 ttg::meta::detail::keymap_t<keyT> keymap;
213 ttg::meta::detail::keymap_t<keyT> priomap;
214 // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
215 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
216 input_reducers;
217 int num_pullins = 0;
218
219 std::array<std::size_t, std::tuple_size_v<actual_input_tuple_type>> static_streamsize;
220
221 public:
223
225 static constexpr bool derived_has_cuda_op() {
226 return false;
227 }
228
230 static constexpr bool derived_has_hip_op() {
231 return false;
232 }
233
235 static constexpr bool derived_has_level_zero_op() {
236 return false;
237 }
238
240 static constexpr bool derived_has_device_op() {
241 return false;
242 }
243
244 protected:
245 using worldobjT = ::madness::WorldObject<ttT>;
246
247 static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
248 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
249 static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
250
251 // This to support tt fusion
252 inline static __thread struct {
253 uint64_t key_hash = 0; // hash of current key
254 size_t call_depth = 0; // how deep calls are nested
256
257 public:
260 static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
262 "at most one void input can be handled, and it must come last");
263 // if have data inputs and (always last) control input, convert last input to Void to make logic easier
268
269 using input_args_type = actual_input_tuple_type;
270
274
277
278 protected:
279 template <std::size_t i, typename resultT, typename InTuple>
281 return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
282 };
283 template <std::size_t i, typename InTuple>
284 static auto &get(InTuple &&intuple) {
285 return std::get<i>(std::forward<InTuple>(intuple));
286 };
287
288 private:
289 input_terminals_type input_terminals;
290 output_terminalsT output_terminals;
291
292 protected:
293 const auto &get_output_terminals() const { return output_terminals; }
294
295 private:
296 struct TTArgs : ::madness::TaskInterface {
297 private:
298 using TaskInterface = ::madness::TaskInterface;
299
300 public:
301 int counter; // Tracks the number of arguments finalized
302 std::array<std::int64_t, numins>
303 nargs; // Tracks the number of expected values minus the number of received values
304 // 0 = finalized
305 // for a streaming input initialized to std::numeric_limits<std::int64_t>::max()
306 // which indicates that the value needs to be initialized
307 std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
308 // inputs (0 = unbounded stream, >0 = bounded stream)
309 input_values_tuple_type input_values; // The input values (does not include control)
310 derivedT *derived; // Pointer to derived class instance
311 bool pull_terminals_invoked = false;
312 std::conditional_t<ttg::meta::is_void_v<keyT>, ttg::Void, keyT> key; // Task key
313#ifdef TTG_HAVE_COROUTINE
314 void *suspended_task_address = nullptr; // if not null the function is suspended
316#endif // TTG_HAVE_COROUTINE
317
319 template <typename Tuple, std::size_t... Is>
320 static input_refs_tuple_type make_input_refs_impl(Tuple &&inputs, std::index_sequence<Is...>) {
322 get<Is, std::tuple_element_t<Is, input_refs_tuple_type>>(std::forward<Tuple>(inputs))...};
323 }
324
326 input_refs_tuple_type make_input_refs() {
327 return make_input_refs_impl(this->input_values,
328 std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{});
329 }
330
331 TTArgs(int prio = 0)
332 : TaskInterface(TaskAttributes(prio ? TaskAttributes::HIGHPRIORITY : 0))
333 , counter(numins)
334 , nargs()
335 , stream_size()
336 , input_values() {
337 std::fill(nargs.begin(), nargs.end(), std::numeric_limits<std::int64_t>::max());
338 }
339
340 virtual void run(::madness::World &world) override {
341 using ttg::hash;
342 ttT::threaddata.key_hash = hash<decltype(key)>{}(key);
343 ttT::threaddata.call_depth++;
344
345 void *suspended_task_address =
346#ifdef TTG_HAVE_COROUTINE
347 this->suspended_task_address; // non-null = need to resume the task
348#else // TTG_HAVE_COROUTINE
349 nullptr;
350#endif // TTG_HAVE_COROUTINE
351 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
352 // ttg::print("starting task");
355 suspended_task_address,
356 coroutine_id,
357 derived->op(key, this->make_input_refs(),
358 derived->output_terminals)); // !!! NOTE converting input values to refs
360 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
363 suspended_task_address,
364 coroutine_id,
365 derived->op(this->make_input_refs(),
366 derived->output_terminals)); // !!! NOTE converting input values to refs
368 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
369 } else // unreachable
370 ttg::abort();
371 } else { // resume suspended coroutine
372#ifdef TTG_HAVE_COROUTINE
374 assert(ret.ready());
375 ret.resume();
376 if (ret.completed()) {
377 ret.destroy();
378 suspended_task_address = nullptr;
379 } else { // not yet completed
380 // leave suspended_task_address as is
381 }
382 this->suspended_task_address = suspended_task_address;
383#else // TTG_HAVE_COROUTINE
384 ttg::abort(); // should not happen
385#endif // TTG_HAVE_COROUTINE
386 }
387
388 ttT::threaddata.call_depth--;
389
390 // if (suspended_task_address == nullptr) {
391 // ttg::print("finishing task",ttT::threaddata.call_depth);
392 // }
393
394#ifdef TTG_HAVE_COROUTINE
395 if (suspended_task_address) {
396 // TODO implement handling of suspended coroutines properly
397
398 // only resumable_task is recognized at the moment
400
401 // right now can events are not properly implemented, we are only testing the workflow with dummy events
402 // so mark the events finished manually
403 // proper thing to do is to process event queue and resubmit this task again
404 auto events =
405 static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
406 for (auto &event_ptr : events) {
407 event_ptr->finish();
408 }
410
411 // resume the coroutine
413 assert(ret.ready());
414 ret.resume();
415 if (ret.completed()) {
416 ret.destroy();
417 suspended_task_address = nullptr;
418 } else { // not yet completed
419 ttg::abort();
420 }
421 }
422#endif // TTG_HAVE_COROUTINE
423 }
424
425 virtual ~TTArgs() {} // Will be deleted via TaskInterface*
426
427 private:
428 ::madness::Spinlock lock_; // synchronizes access to data
429 public:
430 void lock() { lock_.lock(); }
431 void unlock() { lock_.unlock(); }
432 };
433
434 using hashable_keyT = std::conditional_t<ttg::meta::is_void_v<keyT>, int, keyT>;
435 using cacheT = ::madness::ConcurrentHashMap<hashable_keyT, TTArgs *, ttg::hash<hashable_keyT>>;
436 using accessorT = typename cacheT::accessor;
437 cacheT cache;
438
439 protected:
440 template <typename terminalT, std::size_t i, typename Key>
441 void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args) {
442 if (in.is_pull_terminal) {
443 int owner;
444 if constexpr (!ttg::meta::is_void_v<Key>) {
445 owner = in.container.owner(key);
446 } else {
447 owner = in.container.owner();
448 }
449
450 if (owner != world.rank()) {
452 } else {
453 if constexpr (!ttg::meta::is_void_v<Key>) {
454 auto value = (in.container).get(key);
455 if (args->nargs[i] == 0) {
456 ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
457 ": error argument is already finalized : ", i);
458 throw std::runtime_error("Op::set_arg called for a finalized stream");
459 }
460
461 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
462 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
463 args->nargs[i] = 0;
464 args->counter--;
465 }
466 } else {
467 auto value = (in.container).get();
468 if (args->nargs[i] == 0) {
469 ::ttg::print_error(world.rank(), ":", get_name(), " : ", key,
470 ": error argument is already finalized : ", i);
471 throw std::runtime_error("Op::set_arg called for a finalized stream");
472 }
473
474 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
475 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
476 args->nargs[i] = 0;
477 args->counter--;
478 }
479 }
480 }
481 }
482 }
483
484 template <std::size_t i, typename Key>
485 void get_terminal_data(const int owner, const Key &key) {
486 if (owner != world.rank()) {
487 worldobjT::send(owner, &ttT::template get_terminal_data<i, Key>, owner, key);
488 } else {
489 auto &in = std::get<i>(input_terminals);
490 if constexpr (!ttg::meta::is_void_v<Key>) {
491 auto value = (in.container).get(key);
492 worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
493 key, value);
494 } else {
495 auto value = (in.container).get();
496 worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
497 value);
498 }
499 }
500 }
501
502 template <std::size_t... IS, typename Key = keyT>
503 void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, TTArgs *args) {
505 std::get<IS>(input_terminals), key, args),
506 0)...};
507 junk[0]++;
508 }
509
510 // there are 6 types of set_arg:
511 // - case 1: nonvoid Key, complete Value type
512 // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
513 // - case 3: nonvoid Key, void Value, no inputs
514 // - case 4: void Key, complete Value type
515 // - case 5: void Key, void Value, mixed (data+control) inputs
516 // - case 6: void Key, void Value, no inputs
517 // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
518
519 // case 1:
520 template <std::size_t i, typename Key, typename Value>
521 void set_arg(const Key &key, Value &&value) {
522 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>; // Should be T or const T
523 static_assert(std::is_same_v<std::decay_t<Value>, std::decay_t<valueT>>,
524 "TT::set_arg(key,value) given value of type incompatible with TT");
525
526 int owner;
527 if constexpr (!ttg::meta::is_void_v<Key>) {
528 owner = keymap(key);
529 } else {
530 owner = keymap();
531 }
532
533 if (owner != world.rank()) {
534 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding setting argument : ", i);
535 // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
536 // BUT compiler vomits when const std::remove_reference_t<Value>& -> std::decay_t<Value>
537 // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
538 // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
539 // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
540 // move arguments) and locally
541 // here we know that this will be a remove execution, so we prepare to take rvalues;
542 // send_am will need to separate local and remote paths to deal with this
543 if constexpr (!ttg::meta::is_void_v<Key>) {
544 if constexpr (!ttg::meta::is_void_v<Value>) {
545 worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
546 } else {
547 worldobjT::send(owner, &ttT::template set_arg<i, Key, void>, key);
548 }
549 } else {
550 if constexpr (!ttg::meta::is_void_v<Value>) {
551 worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
552 } else {
553 worldobjT::send(owner, &ttT::template set_arg<i, void, void>);
554 }
555 }
556 } else {
557 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
558
559 bool pullT_invoked = false;
560 accessorT acc;
561
562 int prio;
563 if constexpr (!ttg::meta::is_void_v<Key>) {
564 prio = this->priomap(key);
565 if (cache.insert(acc, key)) {
566 acc->second = new TTArgs(prio); // It will be deleted by the task q
567 if (!is_lazy_pull()) {
568 // Invoke pull terminals for only the terminals with non-void values.
569 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
570 acc->second);
571 pullT_invoked = true;
572 }
573 }
574 } else {
575 prio = this->priomap();
576 if (cache.insert(acc, 0)) acc->second = new TTArgs(prio); // It will be deleted by the task q
577 }
578
579 TTArgs *args = acc->second;
580 if (!is_lazy_pull() && pullT_invoked) args->pull_terminals_invoked = true;
581
582 if (args->nargs[i] == 0) {
583 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
584 throw std::runtime_error("TT::set_arg called for a finalized stream");
585 }
586
587 const auto &reducer = std::get<i>(input_reducers);
588 if (reducer) { // is this a streaming input? reduce the received value
589 // N.B. Right now reductions are done eagerly, without spawning tasks
590 // this means we must lock
591 args->lock();
592
593 bool initialize_not_reduce = false;
594 if (args->nargs[i] == std::numeric_limits<std::int64_t>::max()) {
595 // upon first datum initialize, if needed
596 if constexpr (!ttg::meta::is_void_v<valueT>) {
598 }
599
600 // initialize nargs
601 // if we have a stream size for the op, use it first
602 if (args->stream_size[i] != 0) {
603 assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
604 args->nargs[i] = args->stream_size[i];
605 } else if (static_streamsize[i] != 0) {
606 assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
607 args->stream_size[i] = static_streamsize[i];
608 args->nargs[i] = static_streamsize[i];
609 } else {
610 args->nargs[i] = 0;
611 }
612 }
613
614 if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
616 this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
617 else
618 reducer(this->get<i, std::decay_t<valueT> &>(args->input_values), value);
619 } else {
620 reducer(); // even if this was a control input, must execute the reducer for possible side effects
621 }
622
623 // update the counter
624 args->nargs[i]--;
625
626 // is this the last message?
627 if (args->nargs[i] == 0) args->counter--;
628
629 args->unlock();
630 } else { // this is a nonstreaming input => set the value
631 if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
632 this->get<i, std::decay_t<valueT> &>(args->input_values) = std::forward<Value>(value);
633 }
634 args->nargs[i] = 0;
635 args->counter--;
636 }
637
638 // If lazy pulling in enabled, check it here.
639 if (numins - args->counter == num_pullins) {
640 if (is_lazy_pull() && !args->pull_terminals_invoked) {
641 // Invoke pull terminals for only the terminals with non-void values.
642 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
643 }
644 }
645
646 // ready to run the task?
647 if (args->counter == 0) {
648 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
649 args->derived = static_cast<derivedT *>(this);
650 args->key = key;
651
652 using ttg::hash;
653 auto curhash = hash<keyT>{}(key);
654
655 if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
656
657 // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
658 ttT::threaddata.call_depth++;
660 static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
662 static_cast<derivedT *>(this)->op(key, output_terminals); // Runs immediately
664 static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
666 static_cast<derivedT *>(this)->op(output_terminals); // Runs immediately
667 } else
668 ttg::abort();
669 ttT::threaddata.call_depth--;
670
671 } else {
672 // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
673 world.impl().impl().taskq.add(args);
674 }
675
676 cache.erase(acc);
677 }
678 }
679 }
680
681 // case 2 and 3
682 template <std::size_t i, typename Key, typename Value>
683 std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
684 set_arg<i>(key, ttg::Void{});
685 }
686
687 // case 4
688 template <std::size_t i, typename Key = keyT, typename Value>
689 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
690 return set_arg<i>(ttg::Void{}, std::forward<Value>(value));
691 }
692
693 // case 5 and 6
694 template <std::size_t i, typename Key = keyT, typename Value>
695 std::enable_if_t<ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg() {
697 }
698
699 // Used by invoke to set all arguments associated with a task
700 // Is: index sequence of elements in args
701 // Js: index sequence of input terminals to set
702 template <typename Key, typename... Ts, size_t... Is, size_t... Js>
703 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
704 const Key &key, const std::tuple<Ts...> &args) {
705 static_assert(sizeof...(Js) == sizeof...(Is));
706 constexpr std::size_t js[] = {Js...};
707 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
708 junk[0]++;
709 }
710
711 // Used by invoke to set all arguments associated with a task
712 // Is: index sequence of input terminals to set
713 template <typename Key, typename... Ts, size_t... Is>
714 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
715 const std::tuple<Ts...> &args) {
716 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
717 }
718
719 // Used by invoke to set all arguments associated with a task
720 // Is: index sequence of elements in args
721 // Js: index sequence of input terminals to set
722 template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
723 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
724 const std::tuple<Ts...> &args) {
725 static_assert(sizeof...(Js) == sizeof...(Is));
726 constexpr std::size_t js[] = {Js...};
727 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
728 junk[0]++;
729 }
730
731 // Used by invoke to set all arguments associated with a task
732 // Is: index sequence of input terminals to set
733 template <typename Key = keyT, typename... Ts, size_t... Is>
734 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
735 const std::tuple<Ts...> &args) {
736 set_args(std::index_sequence_for<Ts...>{}, is, args);
737 }
738
739 public:
742 template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
743 std::enable_if_t<key_is_void, void> set_argstream_size(std::size_t size) {
744 // preconditions
745 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
746 assert(size > 0 && "TT::set_argstream_size(size) called with size=0");
747
748 // body
749 const auto owner = keymap();
750 if (owner != world.rank()) {
751 ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
752 worldobjT::send(owner, &ttT::template set_argstream_size<i, true>, size);
753 } else {
754 ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
755
756 accessorT acc;
757 if (cache.insert(acc, 0)) acc->second = new TTArgs(); // It will be deleted by the task q
758 TTArgs *args = acc->second;
759
760 args->lock();
761
762 // check if stream is already bounded
763 if (args->stream_size[i] > 0) {
764 ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
765 throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
766 }
767
768 // check if stream is already finalized
769 if (args->nargs[i] == 0) {
770 ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
771 throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
772 }
773
774 // commit changes
775 args->stream_size[i] = size;
776 // if messages already received for this key update the expected-received counter
777 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
779 // cannot have received more messages than expected
780 if (-(args->nargs[i]) > size) {
781 ttg::print_error(world.rank(), ":", get_name(),
782 " : error stream received more messages than specified via set_argstream_size : ", i);
783 throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
784 }
785 args->nargs[i] += size;
786 }
787 // if done, update the counter
788 if (args->nargs[i] == 0) args->counter--;
789 args->unlock();
790
791 // ready to run the task?
792 if (args->counter == 0) {
793 ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
794 args->derived = static_cast<derivedT *>(this);
795
796 world.impl().impl().taskq.add(args);
797
798 cache.erase(acc);
799 }
800 }
801 }
802
803 template <std::size_t i>
804 void set_static_argstream_size(std::size_t size) {
805 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
806 assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
807
808 ttg::trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
809
810 // Check if stream is already bounded
811 if (static_streamsize[i] > 0) {
812 ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
813 throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
814 }
815
816 // commit changes
817 static_streamsize[i] = size;
818 }
819
824 template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
825 std::enable_if_t<!key_is_void, void> set_argstream_size(const Key &key, std::size_t size) {
826 // preconditions
827 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
828 assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
829
830 // body
831 const auto owner = keymap(key);
832 if (owner != world.rank()) {
833 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
834 worldobjT::send(owner, &ttT::template set_argstream_size<i>, key, size);
835 } else {
836 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
837
838 accessorT acc;
839 if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
840 TTArgs *args = acc->second;
841
842 args->lock();
843
844 // check if stream is already bounded
845 if (args->stream_size[i] > 0) {
846 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
847 throw std::runtime_error("TT::set_argstream_size called for a bounded stream");
848 }
849
850 // check if stream is already finalized
851 if (args->nargs[i] == 0) {
852 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
853 throw std::runtime_error("TT::set_argstream_size called for a finalized stream");
854 }
855
856 // commit changes
857 args->stream_size[i] = size;
858 // if messages already received for this key update the expected-received counter
859 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
860 if (messages_received_already) args->nargs[i] += size;
861 // if done, update the counter
862 if (args->nargs[i] == 0) args->counter--;
863
864 args->unlock();
865
866 // ready to run the task?
867 if (args->counter == 0) {
868 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
869 args->derived = static_cast<derivedT *>(this);
870 args->key = key;
871
872 world.impl().impl().taskq.add(args);
873
874 cache.erase(acc);
875 }
876 }
877 }
878
880 template <std::size_t i, typename Key = keyT, bool key_is_void = ttg::meta::is_void_v<Key>>
881 std::enable_if_t<!key_is_void, void> finalize_argstream(const Key &key) {
882 // preconditions
883 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
884
885 // body
886 const auto owner = keymap(key);
887 if (owner != world.rank()) {
888 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
889 worldobjT::send(owner, &ttT::template finalize_argstream<i>, key);
890 } else {
891 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
892
893 accessorT acc;
894 const auto found = cache.find(acc, key);
895 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
897 TTArgs *args = acc->second;
898
899 // check if stream is already bounded
900 if (args->stream_size[i] > 0) {
901 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
902 throw std::runtime_error("TT::finalize called for a bounded stream");
903 }
904
905 // check if stream is already finalized
906 if (args->nargs[i] == 0) {
907 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
908 throw std::runtime_error("TT::finalize called for a finalized stream");
909 }
910
911 // commit changes
912 args->nargs[i] = 0;
913 args->counter--;
914 // ready to run the task?
915 if (args->counter == 0) {
916 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": submitting task for op ");
917 args->derived = static_cast<derivedT *>(this);
918 args->key = key;
919
920 world.impl().impl().taskq.add(args);
921 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
922
923 cache.erase(acc);
924 }
925 }
926 }
927
929 template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
930 std::enable_if_t<key_is_void, void> finalize_argstream() {
931 // preconditions
932 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
933
934 // body
935 const int owner = keymap();
936 if (owner != world.rank()) {
937 ttg::trace(world.rank(), ":", get_name(), " : forwarding stream finalize for terminal ", i);
939 } else {
940 ttg::trace(world.rank(), ":", get_name(), " : finalizing stream for terminal ", i);
941
942 accessorT acc;
943 const auto found = cache.find(acc, 0);
944 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
946 TTArgs *args = acc->second;
947
948 // check if stream is already bounded
949 if (args->stream_size[i] > 0) {
950 ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
951 throw std::runtime_error("TT::finalize called for a bounded stream");
952 }
953
954 // check if stream is already finalized
955 if (args->nargs[i] == 0) {
956 ttg::print_error(world.rank(), ":", get_name(), " : error stream is already finalized : ", i);
957 throw std::runtime_error("TT::finalize called for a finalized stream");
958 }
959
960 // commit changes
961 args->nargs[i] = 0;
962 args->counter--;
963 // ready to run the task?
964 if (args->counter == 0) {
965 ttg::trace(world.rank(), ":", get_name(), " : submitting task for op ");
966 args->derived = static_cast<derivedT *>(this);
967
968 world.impl().impl().taskq.add(args);
969 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
970
971 cache.erase(acc);
972 }
973 }
974 }
975
976 private:
977 // Copy/assign/move forbidden ... we could make it work using
978 // PIMPL for this base class. However, this instance of the base
979 // class is tied to a specific instance of a derived class a
980 // pointer to which is captured for invoking derived class
981 // functions. Thus, not only does the derived class have to be
982 // involved but we would have to do it in a thread safe way
983 // including for possibly already running tasks and remote
984 // references. This is not worth the effort ... wherever you are
985 // wanting to move/assign an TT you should be using a pointer.
986 TT(const TT &other) = delete;
987 TT &operator=(const TT &other) = delete;
988 TT(TT &&other) = delete;
989 TT &operator=(TT &&other) = delete;
990
991 // Registers the callback for the i'th input terminal
992 template <typename terminalT, std::size_t i>
993 void register_input_callback(terminalT &input) {
994 static_assert(std::is_same_v<keyT, typename terminalT::key_type>,
995 "TT::register_input_callback(terminalT) -- incompatible terminalT");
996 using valueT = std::decay_t<typename terminalT::value_type>;
997
998 if (input.is_pull_terminal) {
999 num_pullins++;
1000 }
1001
1003 // case 1: nonvoid key, nonvoid value
1006 !std::is_void_v<valueT>) {
1007 auto move_callback = [this](const keyT &key, valueT &&value) {
1008 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
1009 };
1010 auto send_callback = [this](const keyT &key, const valueT &value) {
1012 };
1013 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1014 auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1015 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1016 }
1018 // case 4: void key, nonvoid value
1021 !std::is_void_v<valueT>) {
1022 auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1023 auto send_callback = [this](const valueT &value) { set_arg<i, keyT, const valueT &>(value); };
1024 auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1025 auto finalize_callback = [this]() { finalize_argstream<i>(); };
1026 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback);
1027 }
1029 // case 2: nonvoid key, void value, mixed inputs
1030 // case 3: nonvoid key, void value, no inputs
1032 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
1033 auto send_callback = [this](const keyT &key) { set_arg<i, keyT, void>(key); };
1034 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1035 auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
1036 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1037 }
1039 // case 5: void key, void value, mixed inputs
1040 // case 6: void key, void value, no inputs
1042 else if constexpr (ttg::meta::is_all_void_v<keyT, valueT> && std::is_void_v<valueT>) {
1043 auto send_callback = [this]() { set_arg<i, keyT, void>(); };
1044 auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
1045 auto finalize_callback = [this]() { finalize_argstream<i>(); };
1046 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
1047 } else
1048 ttg::abort();
1049 }
1050
1051 template <std::size_t... IS>
1052 void register_input_callbacks(std::index_sequence<IS...>) {
1053 int junk[] = {
1054 0,
1056 0)...};
1057 junk[0]++;
1058 }
1059
1060 template <std::size_t... IS, typename inedgesT>
1061 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1062 static_assert(sizeof...(IS) == std::tuple_size_v<input_terminals_type>);
1063 static_assert(std::tuple_size_v<inedgesT> == std::tuple_size_v<input_terminals_type>);
1064 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
1065 junk[0]++;
1066 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1067 " Edges");
1068 }
1069
1070 template <std::size_t... IS, typename outedgesT>
1071 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1072 static_assert(sizeof...(IS) == numouts);
1073 static_assert(std::tuple_size_v<outedgesT> == numouts);
1074 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
1075 junk[0]++;
1076 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1077 " Edges");
1078 }
1079
1080 public:
1081 template <typename keymapT = ttg::detail::default_keymap<keyT>,
1082 typename priomapT = ttg::detail::default_priomap<keyT>>
1083 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1085 : ttg::TTBase(name, numinedges, numouts)
1086 , static_streamsize()
1087 , worldobjT(world.impl().impl())
1088 , world(world)
1089 // if using default keymap, rebind to the given world
1090 , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1091 ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1092 : decltype(keymap)(std::forward<keymapT>(keymap_)))
1093 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1094 // Cannot call these in base constructor since terminals not yet constructed
1095 if (innames.size() != numinedges) {
1096 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1097 numinedges);
1098 throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1099 }
1100 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1101
1102 register_input_terminals(input_terminals, innames);
1103 register_output_terminals(output_terminals, outnames);
1104
1105 register_input_callbacks(std::make_index_sequence<numinedges>{});
1106 }
1107
1108 template <typename keymapT = ttg::detail::default_keymap<keyT>,
1109 typename priomapT = ttg::detail::default_priomap<keyT>>
1110 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1111 keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1112 : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
1113 std::forward<priomapT>(priomap)) {}
1114
1115 template <typename keymapT = ttg::detail::default_keymap<keyT>,
1116 typename priomapT = ttg::detail::default_priomap<keyT>>
1117 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1118 const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1120 : ttg::TTBase(name, numinedges, numouts)
1121 , static_streamsize()
1122 , worldobjT(ttg::default_execution_context().impl().impl())
1123 , world(ttg::default_execution_context())
1124 // if using default keymap, rebind to the given world
1125 , keymap(std::is_same_v<keymapT, ttg::detail::default_keymap<keyT>>
1126 ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
1127 : decltype(keymap)(std::forward<keymapT>(keymap_)))
1128 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
1129 // Cannot call in base constructor since terminals not yet constructed
1130 if (innames.size() != numinedges) {
1131 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1132 numinedges);
1133 throw this->get_name() + ":madness::ttg::TT: #input names != #input terminals";
1134 }
1135 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1136
1137 register_input_terminals(input_terminals, innames);
1138 register_output_terminals(output_terminals, outnames);
1139
1140 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
1141 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
1142 // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1143 register_input_callbacks(std::make_index_sequence<numinedges>{});
1144 }
1145
1146 template <typename keymapT = ttg::detail::default_keymap<keyT>,
1147 typename priomapT = ttg::detail::default_priomap<keyT>>
1148 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1149 const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1150 keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
1151 : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
1152 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
1153
1154 // Destructor checks for unexecuted tasks
1155 virtual ~TT() {
1156 if (cache.size() != 0) {
1157 std::cerr << world.rank() << ":"
1158 << "warning: unprocessed tasks in destructor of operation '" << get_name()
1159 << "' (class name = " << get_class_name() << ")" << std::endl;
1160 std::cerr << world.rank() << ":"
1161 << " T => argument assigned F => argument unassigned" << std::endl;
1162 int nprint = 0;
1163 for (auto item : cache) {
1164 if (nprint++ > 10) {
1165 std::cerr << " etc." << std::endl;
1166 break;
1167 }
1168 using ::madness::operators::operator<<;
1169 std::cerr << world.rank() << ":"
1170 << " unused: " << item.first << " : ( ";
1171 for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1172 std::cerr << ")" << std::endl;
1173 }
1174 ttg::abort();
1175 }
1176 }
1177
1183 template <std::size_t i, typename Reducer>
1185 ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
1186 std::get<i>(input_reducers) = reducer;
1187 }
1188
1196 template <std::size_t i, typename Reducer>
1197 void set_input_reducer(Reducer &&reducer, std::size_t size) {
1198 set_input_reducer<i>(std::forward<Reducer>(reducer));
1200 }
1201
1202 template <typename Keymap>
1204 keymap = km;
1205 }
1206
1207 auto get_priomap(void) const { return priomap; }
1208
1212 template <typename Priomap>
1214 priomap = std::forward<Priomap>(pm);
1215 }
1216
1219 template<typename Constraint>
1221 /* currently a noop */
1222 }
1223
1224 template<typename Constraint, typename Mapper>
1225 void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
1226 /* currently a noop */
1227 }
1228
1229 template<typename Constraint, typename Mapper>
1231 /* currently a noop */
1232 }
1233
1235 void make_executable() override {
1236 TTBase::make_executable();
1237 this->process_pending();
1238 }
1239
1241
1246 void fence() override { ttg_fence(world); }
1247
1249 template <std::size_t i>
1250 std::tuple_element_t<i, input_terminals_type> *in() {
1251 return &std::get<i>(input_terminals);
1252 }
1253
1255 template <std::size_t i>
1256 std::tuple_element_t<i, output_terminalsT> *out() {
1257 return &std::get<i>(output_terminals);
1258 }
1259
1261 template <typename Key = keyT>
1262 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1263 const Key &key, const input_values_tuple_type &args) {
1265 if constexpr(!std::is_same_v<Key, key_type>) {
1266 key_type k = key; /* cast that type into the key type we know */
1267 invoke(k, args);
1268 } else {
1269 /* trigger non-void inputs */
1271 /* trigger void inputs */
1273 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1274 }
1275 }
1276
1278 template <typename Key = keyT>
1279 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1280 const input_values_tuple_type &args) {
1282 /* trigger non-void inputs */
1284 /* trigger void inputs */
1286 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1287 }
1288
1290 template <typename Key = keyT>
1291 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1292 const Key &key) {
1294 if constexpr(!std::is_same_v<Key, key_type>) {
1295 key_type k = key; /* cast that type into the key type we know */
1296 invoke(k);
1297 } else {
1298 /* trigger void inputs */
1300 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
1301 }
1302 }
1303
1305 template <typename Key = keyT>
1306 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1308 /* trigger void inputs */
1310 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
1311 }
1312
1313 void invoke() override {
1315 invoke<keyT>();
1316 else
1317 TTBase::invoke();
1318 }
1319
1320 void set_defer_writer(bool _) {}
1321
1322 bool get_defer_writer(bool _) { return false; }
1323
1326 const decltype(keymap) &get_keymap() const { return keymap; }
1327
1331 template <typename Key>
1332 std::enable_if_t<!ttg::meta::is_void_v<Key>, int> owner(const Key &key) const {
1333 return keymap(key);
1334 }
1335
1338 template <typename Key>
1339 std::enable_if_t<ttg::meta::is_void_v<Key>, int> owner() const {
1340 return keymap();
1341 }
1342 };
1343
1344#include "ttg/make_tt.h"
1345
1346} // namespace ttg_madness
1347
1348#include "ttg/madness/watch.h"
1349#include "ttg/madness/buffer.h"
1350#include "ttg/madness/ttvalue.h"
1351
1352#endif // MADNESS_TTG_H_INCLUDED
#define TTG_OP_ASSERT_EXECUTABLE()
Definition tt.h:278
Edge is used to connect In and Out terminals.
Definition edge.h:26
A base class for all template tasks.
Definition tt.h:32
std::string get_class_name() const
Gets the demangled class name (uses RTTI)
Definition tt.h:222
TTBase(TTBase &&other)
Definition tt.h:117
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition tt.h:86
const std::string & get_name() const
Gets the name of this operation.
Definition tt.h:219
bool is_lazy_pull()
Definition tt.h:201
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition tt.h:93
A complete version of void.
Definition void.h:12
int rank() const
Definition world.h:205
WorldImplT & impl(void)
Definition world.h:217
Base class for implementation-specific Worlds.
Definition world.h:34
void release_ops(void)
Definition world.h:55
WorldImplBase(int size, int rank)
Definition world.h:62
bool is_valid(void) const
Definition world.h:155
void set_keymap(Keymap &&km)
Definition ttg.h:1203
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition ttg.h:258
actual_input_tuple_type input_args_type
Definition ttg.h:269
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition ttg.h:695
void set_priomap(Priomap &&pm)
Definition ttg.h:1213
void set_defer_writer(bool _)
Definition ttg.h:1320
void set_arg(const Key &key, Value &&value)
Definition ttg.h:521
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition ttg.h:1246
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition ttg.h:1197
void add_constraint(Constraint c, Mapper &&map)
Definition ttg.h:1230
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition ttg.h:689
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition ttg.h:1083
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition ttg.h:259
static resultT get(InTuple &&intuple)
Definition ttg.h:280
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition ttg.h:1225
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition ttg.h:1256
static constexpr bool derived_has_cuda_op()
Definition ttg.h:225
static constexpr bool derived_has_level_zero_op()
Definition ttg.h:235
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition ttg.h:683
const auto & get_output_terminals() const
Definition ttg.h:293
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition ttg.h:1250
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition ttg.h:1306
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition ttg.h:1117
static constexpr int numouts
Definition ttg.h:249
static auto & get(InTuple &&intuple)
Definition ttg.h:284
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition ttg.h:1148
void get_terminal_data(const int owner, const Key &key)
Definition ttg.h:485
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition ttg.h:267
void set_input_reducer(Reducer &&reducer)
Definition ttg.h:1184
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition ttg.h:1262
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition ttg.h:1279
virtual ~TT()
Definition ttg.h:1155
size_t call_depth
Definition ttg.h:254
static constexpr int numinedges
Definition ttg.h:247
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition ttg.h:265
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition ttg.h:714
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition ttg.h:723
output_terminalsT output_terminals_type
Definition ttg.h:275
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition ttg.h:930
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition ttg.h:271
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition ttg.h:276
static constexpr bool derived_has_hip_op()
Definition ttg.h:230
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition ttg.h:272
uint64_t key_hash
Definition ttg.h:253
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition ttg.h:1339
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition ttg.h:743
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition ttg.h:1110
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition ttg.h:503
void invoke() override
Definition ttg.h:1313
void set_static_argstream_size(std::size_t size)
Definition ttg.h:804
void make_executable() override
implementation of TTBase::make_executable()
Definition ttg.h:1235
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition ttg.h:1291
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition ttg.h:734
keyT key_type
Definition ttg.h:206
static constexpr bool derived_has_device_op()
Definition ttg.h:240
::madness::WorldObject< ttT > worldobjT
Definition ttg.h:245
auto get_priomap(void) const
Definition ttg.h:1207
const decltype(keymap) & get_keymap() const
Definition ttg.h:1326
void add_constraint(Constraint &&c)
Definition ttg.h:1220
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition ttg.h:881
ttg::World get_world() const override final
Definition ttg.h:222
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition ttg.h:1332
static __thread struct ttg_madness::TT::@0 threaddata
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition ttg.h:441
static constexpr int numins
Definition ttg.h:248
bool get_defer_writer(bool _)
Definition ttg.h:1322
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition ttg.h:825
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition ttg.h:703
WorldImpl(const WorldImpl &other)=delete
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
Definition ttg.h:105
virtual void fence_impl(void) override
Definition ttg.h:101
::madness::World & impl()
Definition ttg.h:120
WorldImpl & operator=(WorldImpl &&other)=delete
ttg::Edge & ctl_edge()
Definition ttg.h:103
virtual ~WorldImpl() override
Definition ttg.h:93
WorldImpl(WorldImpl &&other)=delete
WorldImpl(::madness::World &world)
Definition ttg.h:82
const ::madness::World & impl() const
Definition ttg.h:122
virtual void destroy(void) override
Definition ttg.h:107
WorldImpl(const SafeMPI::Intracomm &comm)
Definition ttg.h:84
#define TTGUNUSED(x)
Definition macro.h:7
STL namespace.
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition terminal.h:355
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition env.cpp:16
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition edge.h:200
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition typelist.h:53
constexpr auto get(typelist< T, RestOfTs... >)
Definition typelist.h:102
this contains MADNESS-based TTG functionality
Definition fwd.h:17
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition ttg.h:155
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition ttg.h:131
void ttg_execute(ttg::World world)
Definition ttg.h:149
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition ttg.h:173
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void > > &status_ptr)
Definition ttg.h:164
void ttg_sum(ttg::World world, T &value)
Definition ttg.h:176
void ttg_fence(ttg::World world)
Definition ttg.h:152
void ttg_finalize()
Definition ttg.h:139
void make_executable_hook(ttg::World &)
Definition ttg.h:129
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition ttg.h:169
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition ttg.h:182
void ttg_abort()
Definition ttg.h:145
ttg::World ttg_default_execution_context()
Definition ttg.h:144
top-level TTG namespace contains runtime-neutral functionality
Definition keymap.h:9
int size(World world=default_execution_context())
Definition run.h:131
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition run.h:104
ttg::World & get_default_world()
Definition world.h:81
World default_execution_context()
Accesses the default backend's default execution context.
Definition run.h:110
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition coroutine.h:25
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition print.h:139
void trace(const T &t, const Ts &... ts)
Definition trace.h:44
TaskCoroutineID
Definition coroutine.h:223
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition hash.h:82
task that can be resumed after some events occur
Definition coroutine.h:54
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition tt.h:182