ttg.h
Go to the documentation of this file.
77 : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
125 ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
127 std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
156 inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
187 class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs>> {
192 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
199 static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
205 // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
239 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
250 using input_edges_type = ttg::detail::edges_tuple_t<keyT, ttg::meta::decayed_typelist_t<input_tuple_type>>;
251 static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
252 static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
254 // if have data inputs and (always last) control input, convert last input to Void to make logic easier
258 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
262 using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
263 using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
298 std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
342 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
344 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
350 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
351 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
352 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
358 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
359 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
364 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
392 // right now can events are not properly implemented, we are only testing the workflow with dummy events
396 static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
400 assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
403 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
452 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
453 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
465 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
466 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
483 worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
487 worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
495 int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
508 // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
526 // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
528 // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
529 // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
530 // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
536 worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
542 worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
560 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
574 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
594 assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
597 assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
611 reducer(); // even if this was a control input, must execute the reducer for possible side effects
633 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
646 if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
648 // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
650 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
651 static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
652 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
654 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
655 static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
656 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
663 // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
674 std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
680 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
694 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
705 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
714 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
736 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
745 ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
768 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
774 throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
796 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
818 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
824 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
827 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
830 if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
837 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
843 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
850 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
874 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
879 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
886 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
892 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
898 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
912 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
923 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
935 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
941 ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
960 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
996 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1004 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1011 else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1013 auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1025 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1046 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1052 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1057 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1062 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1067 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1074 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1087 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1091 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1101 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1108 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1109 const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1122 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1126 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1133 // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1139 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1162 for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1236 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1253 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1265 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1280 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1288 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
Definition: world.h:17
Definition: ttg.h:187
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1108
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:734
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1224
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:680
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:256
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:476
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:432
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1188
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1220
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:674
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:872
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:714
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1306
static __thread struct ttg_madness::TT::@0 threaddata
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1230
static constexpr bool derived_has_device_op()
Definition: ttg.h:231
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:226
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:216
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:686
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:263
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1236
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1139
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:694
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:705
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:795
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1313
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1265
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:725
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:816
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:921
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:258
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1253
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1074
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:494
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1280
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:250
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:262
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1101
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:267
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:249
Definition: ttg.h:66
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
WorldImpl(WorldImpl &&other)=delete
WorldImpl & operator=(WorldImpl &&other)=delete
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
Definition: make_index_sequence.hpp:46
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:147
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:123
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:161
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:174
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:156
ttg::World ttg_default_execution_context()
Definition: ttg.h:136
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:545
Definition: edge.h:167