ttg.h
Go to the documentation of this file.
84 : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
132 ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
134 std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
163 inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
193 template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
194 class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs, Space>> {
195 static_assert(Space == ttg::ExecutionSpace::Host, "MADNESS backend only supports Host Execution Space");
200 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
207 static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
213 // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
247 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
258 using input_edges_type = ttg::detail::edges_tuple_t<keyT, ttg::meta::decayed_typelist_t<input_tuple_type>>;
259 static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
260 static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
262 // if have data inputs and (always last) control input, convert last input to Void to make logic easier
266 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
270 using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
271 using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
306 std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
350 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
352 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
358 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
359 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
360 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
366 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
367 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
372 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
400 // right now can events are not properly implemented, we are only testing the workflow with dummy events
404 static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
408 assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
411 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
460 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
461 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
473 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
474 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
491 worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
495 worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
503 int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
516 // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
534 // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
536 // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
537 // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
538 // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
544 worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
550 worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
568 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
582 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
602 assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
605 assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
619 reducer(); // even if this was a control input, must execute the reducer for possible side effects
641 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
654 if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
656 // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
658 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
659 static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
660 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
662 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
663 static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
664 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
671 // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
682 std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
688 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
702 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
713 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
722 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
744 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
753 ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
776 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
782 throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
804 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
826 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
832 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
835 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
838 if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
845 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
851 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
858 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
882 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
887 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
894 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
900 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
906 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
920 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
931 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
943 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
949 ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
968 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
1004 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1012 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1019 else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1021 auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1033 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1054 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1060 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1065 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1070 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1075 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1082 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1095 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1099 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1109 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1116 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1117 const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1130 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1134 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1141 // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1147 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1170 for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1261 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1278 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1290 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1305 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1313 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:85
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:92
Definition: world.h:17
Definition: ttg.h:194
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:257
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:713
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1245
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1196
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:1229
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:688
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1082
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:258
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:1224
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:733
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:224
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:742
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1278
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1305
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:234
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1116
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1147
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:484
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:266
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:929
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:824
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1331
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:264
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:694
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:270
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1338
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:275
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:271
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1255
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:682
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1109
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1261
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:502
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:803
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:880
static constexpr bool derived_has_device_op()
Definition: ttg.h:239
static __thread struct ttg_madness::TT::@0 threaddata
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1290
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:440
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:722
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1249
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:702
Definition: ttg.h:73
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
WorldImpl(WorldImpl &&other)=delete
WorldImpl & operator=(WorldImpl &&other)=delete
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
Definition: make_index_sequence.hpp:46
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:154
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:130
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:168
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:181
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:163
ttg::World ttg_default_execution_context()
Definition: ttg.h:143
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:543
Definition: edge.h:167