distributed.cc
This is the iterative diamond DAG with variable number of inputs using the reducing terminals of Template Task Graph, adapted to run in distributed: iteratively, a reducing diamond of data-dependent width is run, until the amount of data gathered at the bottom of the diamond exceeds a given threshold. First and last tasks of each diamond are run on rank 0, while the tasks inside the diamond are distributed between the ranks in a round-robin fashion.
#include <ttg.h>
#include <ttg/serialization/std/pair.h>
#include <ttg/util/hash/std/pair.h>
using Key2 = std::pair<int, int>;
namespace std {
os << "{" << std::get<0>(key) << ", " << std::get<1>(key) << "}";
return os;
}
} // namespace std
ttg::print("Called task B(", key, ") on rank", ttg::ttg_default_execution_context().rank(), "with input data ", input); ttg::send<0>(std::get<0>(key), input + 1.0, out);
}
ttg::print("Called task C(", k, ") on rank", ttg::ttg_default_execution_context().rank(), "with input ", sum);
if (sum < threshold) {
ttg::send<0>(k + 1, sum, out);
} else {
}
}
ttg::initialize(argc, argv, -1);
ttg::Edge<int, double> B_C("B(k, i)->C(k)");
ttg::Edge<int, double> C_A("C(k)->A(k)");
wc->set_input_reducer<0>([](double &a, const double &b) { a += b; });
auto wa(ttg::make_tt([&](const int &k, const double &input, std::tuple<ttg::Out<Key2, double>> &out) {
wc->set_argstream_size<0>(k, k+1);
for(int i = 0; i < k+1; i++) {
}
wa->set_keymap([&](const int &k) { return 0; });
wc->set_keymap([&](const int &k) { return 0; });
if (wa->get_world().rank() == 0) wa->invoke(0, 0.0);
ttg::execute();
return EXIT_SUCCESS;
}
Definition: terminal.h:429
std::ostream & operator<<(std::ostream &os, ttg::device::Device device)
Definition: device.h:63
auto make_tt(funcT &&func, const std::tuple< ttg::Edge< keyT, input_edge_valuesT >... > &inedges=std::tuple<>{}, const std::tuple< output_edgesT... > &outedges=std::tuple<>{}, const std::string &name="wrapper", const std::vector< std::string > &innames=std::vector< std::string >(sizeof...(input_edge_valuesT), "input"), const std::vector< std::string > &outnames=std::vector< std::string >(sizeof...(output_edgesT), "output"))
Factory function to assist in wrapping a callable with signature.
Definition: make_tt.h:491
ttg::World ttg_default_execution_context()
Definition: ttg.h:143
void execute(ttg::World world)
Starts the execution in the given execution context.
Definition: run.h:74
void send(const keyT &key, valueT &&value, ttg::Out< keyT, valueT > &t)
Sends a task id and a value to the given output terminal.
Definition: func.h:158
void initialize(int argc, char **argv, int num_threads=-1, RestOfArgs &&...)
std::enable_if_t<(std::is_convertible_v< decltype(*(std::declval< TTBasePtrs >))), TTBase & > bool make_graph_executable(TTBasePtrs &&...tts)
Definition: func.h:80
void fence(ttg::World world)
Returns when all tasks associated with the given execution context have finished on all ranks.
Definition: run.h:81
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
std::enable_if_t<!meta::is_void_v< keyT >, void > finalize(const keyT &key, ttg::Out< out_keyT, out_valueT > &t)
Finalize streaming input terminals connecting to the given output terminal for tasks identified by ke...
Definition: func.h:543