ttg.h
Go to the documentation of this file.
78 : WorldImplBase(comm.Get_size(), comm.Get_rank()), m_impl(*new ::madness::World(comm)), m_allocated(true) {}
126 ::madness::World &madworld = ::madness::initialize(argc, argv, num_threads, /* quiet = */ true);
128 std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
157 inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
188 class TT : public ttg::TTBase, public ::madness::WorldObject<TT<keyT, output_terminalsT, derivedT, input_valueTs>> {
193 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
200 static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "input_valueTs cannot contain reference types");
206 // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
240 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
251 using input_edges_type = ttg::detail::edges_tuple_t<keyT, ttg::meta::decayed_typelist_t<input_tuple_type>>;
252 static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
253 static_assert(ttg::meta::is_none_void_v<input_valueTs> || ttg::meta::is_last_void_v<input_valueTs>,
255 // if have data inputs and (always last) control input, convert last input to Void to make logic easier
259 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
263 using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
264 using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
299 std::array<std::size_t, numins> stream_size; // Expected number of values to receive, to be used for streaming
343 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
345 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
351 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
352 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(key, derived->output_terminals));
353 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
359 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
360 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, coroutine_id, derived->op(derived->output_terminals));
365 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
393 // right now can events are not properly implemented, we are only testing the workflow with dummy events
397 static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
401 assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
404 auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
453 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
454 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
466 if (typeid(value) != typeid(std::nullptr_t) && i < std::tuple_size_v<input_values_tuple_type>) {
467 this->get<i, std::decay_t<decltype(value)> &>(args->input_values) = std::forward<decltype(value)>(value);
484 worldobjT::send(keymap(key), &ttT::template set_arg<i, Key, const std::remove_reference_t<decltype(value)> &>,
488 worldobjT::send(keymap(), &ttT::template set_arg<i, void, const std::remove_reference_t<decltype(value)> &>,
496 int junk[] = {0, (invoke_pull_terminal<typename std::tuple_element<IS, input_terminals_type>::type, IS>(
509 // cases 2 and 5 will be implemented by passing dummy ttg::Void object to reduce the number of code branches
527 // should be able on the other end to consume value (since it is just a temporary byproduct of serialization)
529 // this exposes bad design in MemFuncWrapper (probably similar bugs elsewhere?) whose generic operator()
530 // should use memfun's argument types (since that's what will be called) rather than misautodeduce in a
531 // particular context P.S. another issue is in send_am which can execute both remotely (where one can always
537 worldobjT::send(owner, &ttT::template set_arg<i, Key, const std::remove_reference_t<Value> &>, key, value);
543 worldobjT::send(owner, &ttT::template set_arg<i, void, const std::remove_reference_t<Value> &>, value);
561 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key,
575 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error argument is already finalized : ", i);
595 assert(args->stream_size[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
598 assert(static_streamsize[i] <= static_cast<std::size_t>(std::numeric_limits<std::int64_t>::max()));
612 reducer(); // even if this was a control input, must execute the reducer for possible side effects
634 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, key, args);
647 if (curhash == threaddata.key_hash && threaddata.call_depth < 6) { // Needs to be externally configurable
649 // ttg::print("directly invoking:", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
651 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
652 static_cast<derivedT *>(this)->op(key, args->make_input_refs(), output_terminals); // Runs immediately
653 } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
655 } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
656 static_cast<derivedT *>(this)->op(args->make_input_refs(), output_terminals); // Runs immediately
657 } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
664 // ttg::print("enqueuing task", get_name(), key, curhash, threaddata.key_hash, threaddata.call_depth);
675 std::enable_if_t<!ttg::meta::is_void_v<Key> && std::is_void_v<Value>, void> set_arg(const Key &key) {
681 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
695 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
706 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
715 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
737 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
746 ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
769 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
775 throw std::runtime_error("TT::set_argstream_size(n): n less than the number of messages already received");
797 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
819 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
825 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream size for terminal ", i);
828 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": setting stream size for terminal ", i);
831 if (cache.insert(acc, key)) acc->second = new TTArgs(this->priomap(key)); // It will be deleted by the task q
838 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already bounded : ", i);
844 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
851 const auto messages_received_already = args->nargs[i] != std::numeric_limits<std::int64_t>::max();
875 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
880 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
887 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
893 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error finalize called on bounded stream: ", i);
899 ttg::print_error(world.rank(), ":", get_name(), " : ", key, ": error stream is already finalized : ", i);
913 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
924 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
936 assert(found && "TT::finalize_argstream called but no values had been received yet for this key");
942 ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on bounded stream: ", i);
961 // static_cast<derivedT*>(this)->op(key, std::move(args->t), output_terminals); // Runs immediately
997 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1005 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1012 else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type> &&
1014 auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
1026 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
1047 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
1053 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
1058 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT inputs to ", sizeof...(IS),
1063 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
1068 ttg::trace(world.rank(), ":", get_name(), " : connected ", sizeof...(IS), " TT outputs to ", sizeof...(IS),
1075 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1088 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1092 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::TT: #output names != #output terminals";
1102 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
1109 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1110 const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
1123 ttg::print_error(world.rank(), ":", get_name(), "#input_names", innames.size(), "!= #input_terminals",
1127 if (outnames.size() != numouts) throw this->get_name() + ":madness::ttg::T: #output names != #output terminals";
1134 // DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
1140 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
1163 for (std::size_t i = 0; i < numins; i++) std::cerr << (item.second->nargs[i] == 0 ? "T" : "F") << " ";
1254 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1271 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1283 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
1298 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
1306 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
Definition: world.h:17
Definition: ttg.h:188
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1109
std::enable_if_t< key_is_void, void > set_argstream_size(std::size_t size)
Definition: ttg.h:735
std::tuple_element_t< i, input_terminals_type > * in()
Returns pointer to input terminal i to facilitate connection — terminal cannot be copied,...
Definition: ttg.h:1242
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:681
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:257
void get_terminal_data(const int owner, const Key &key)
Definition: ttg.h:477
void invoke_pull_terminal(terminalT &in, const Key &key, TTArgs *args)
Definition: ttg.h:433
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:1189
void fence() override
Waits for the entire TTG associated with this TT to be completed (collective)
Definition: ttg.h:1238
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:1217
std::enable_if_t<!ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg(const Key &key)
Definition: ttg.h:675
std::enable_if_t<!key_is_void, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:873
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:715
std::enable_if_t<!ttg::meta::is_void_v< Key >, int > owner(const Key &key) const
Definition: ttg.h:1324
static __thread struct ttg_madness::TT::@0 threaddata
std::tuple_element_t< i, output_terminalsT > * out()
Returns pointer to output terminal for purpose of connection — terminal cannot be copied,...
Definition: ttg.h:1248
static constexpr bool derived_has_device_op()
Definition: ttg.h:232
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:227
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:217
std::enable_if_t< ttg::meta::is_void_v< Key > &&std::is_void_v< Value >, void > set_arg()
Definition: ttg.h:687
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:264
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Manual injection of a task with all input arguments specified as a tuple.
Definition: ttg.h:1254
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1140
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:695
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:706
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:1222
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:796
std::enable_if_t< ttg::meta::is_void_v< Key >, int > owner() const
Definition: ttg.h:1331
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Manual injection of a task that has no arguments.
Definition: ttg.h:1283
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:726
std::enable_if_t<!key_is_void, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:817
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:922
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:259
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Manual injection of a key-free task with all input arguments specified as a tuple.
Definition: ttg.h:1271
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:1075
void invoke_pull_terminals(std::index_sequence< IS... >, const Key &key, TTArgs *args)
Definition: ttg.h:495
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Manual injection of a task that has no key or arguments.
Definition: ttg.h:1298
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:251
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:263
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:1102
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:268
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:250
Definition: ttg.h:67
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl(const WorldImpl &other)=delete
WorldImpl(WorldImpl &&other)=delete
WorldImpl & operator=(WorldImpl &&other)=delete
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
Definition: make_index_sequence.hpp:46
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:148
void ttg_initialize(int argc, char **argv, int num_threads=-1)
Definition: ttg.h:124
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:162
void ttg_broadcast(ttg::World world, T &data, int source_rank)
Definition: ttg.h:175
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:157
ttg::World ttg_default_execution_context()
Definition: ttg.h:137
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:543
Definition: edge.h:167