2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
13 #define TTG_PARSEC_DEFER_WRITER false
15 #include "ttg/config.h"
20 #include "../../ttg.h"
38 #ifdef TTG_HAVE_DEVICE
56 #include <experimental/type_traits>
71 #if defined(TTG_HAVE_MPI)
73 #if defined(TTG_HAVE_MPIEXT)
80 #include <parsec/class/parsec_hash_table.h>
81 #include <parsec/data_internal.h>
82 #include <parsec/execution_stream.h>
83 #include <parsec/interfaces/interface.h>
84 #include <parsec/mca/device/device.h>
85 #include <parsec/parsec_comm_engine.h>
86 #include <parsec/parsec_internal.h>
87 #include <parsec/scheduling.h>
88 #include <parsec/remote_dep.h>
90 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
91 #include <parsec/mca/device/cuda/device_cuda.h>
93 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
94 #include <parsec/mca/device/hip/device_hip.h>
96 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
97 #include <parsec/mca/device/level_zero/device_level_zero.h>
100 #include <parsec/mca/device/device_gpu.h>
101 #if defined(PARSEC_PROF_TRACE)
102 #include <parsec/profiling.h>
103 #undef PARSEC_TTG_PROFILE_BACKEND
104 #if defined(PARSEC_PROF_GRAPHER)
105 #include <parsec/parsec_prof_grapher.h>
111 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
112 #include <unordered_set>
123 #include <boost/type_index.hpp>
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
170 static void unregister_parsec_tags(
void *_);
183 uint32_t taskpool_id,
188 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
194 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *
data,
long unsigned int size,
195 int src_rank,
void *obj) {
197 parsec_taskpool_t *tp = NULL;
199 uint64_t op_id = msg->
op_id;
206 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207 static_set_arg_fct = op_pair.first;
208 static_set_arg_fct(
data,
size, op_pair.second);
209 tp->tdm.module->incoming_message_end(tp, NULL);
211 }
catch (
const std::out_of_range &e) {
212 void *data_cpy = malloc(
size);
213 assert(data_cpy != 0);
216 ", ", op_id,
", ", data_cpy,
", ",
size,
")");
223 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
224 int src,
void *cb_data);
227 static bool im =
false;
236 bool _task_profiling;
238 mpi_space_support = {
true,
false,
false};
240 int query_comm_size() {
242 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
246 int query_comm_rank() {
248 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
252 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
258 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
265 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
266 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
267 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
268 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
271 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
274 , own_ctx(c == nullptr)
275 #if defined(PARSEC_PROF_TRACE)
276 , profiling_array(nullptr)
277 , profiling_array_size(0)
279 , _dag_profiling(false)
280 , _task_profiling(false)
283 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
287 #
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
288 || MPIX_Query_cuda_support()
295 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
296 || MPIX_Query_hip_support()
302 #if defined(PARSEC_PROF_TRACE)
303 if(parsec_profile_enabled) {
305 #if defined(PARSEC_TTG_PROFILE_BACKEND)
306 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
307 (
int*)&parsec_ttg_profile_backend_set_arg_start,
308 (
int*)&parsec_ttg_profile_backend_set_arg_end);
309 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
310 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
311 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
312 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
313 sizeof(
size_t),
"size{int64_t}",
314 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
315 (
int*)&parsec_ttg_profile_backend_free_datacopy);
320 if( NULL != parsec_ce.tag_register) {
334 assert(
nullptr == tpool);
335 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
336 tpool->taskpool_id = -1;
338 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
339 tpool->taskpool_name = strdup(
"TTG Taskpool");
340 parsec_taskpool_reserve_id(tpool);
342 tpool->devices_index_mask = 0;
343 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
344 parsec_device_module_t *device = parsec_mca_device_get(i);
345 if( NULL == device )
continue;
346 tpool->devices_index_mask |= (1 << device->device_index);
349 #ifdef TTG_USE_USER_TERMDET
350 parsec_termdet_open_module(tpool,
"user_trigger");
352 parsec_termdet_open_dyn_module(tpool);
360 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
363 #if defined(PARSEC_PROF_TRACE)
364 tpool->profiling_array = profiling_array;
373 parsec_taskpool_started =
false;
393 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
396 if (!parsec_taskpool_started) {
397 parsec_enqueue(ctx, tpool);
398 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
399 tpool->tdm.module->taskpool_ready(tpool);
400 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
402 parsec_taskpool_started =
true;
407 #if defined(PARSEC_PROF_TRACE)
411 tpool->profiling_array =
nullptr;
413 assert(NULL != tpool->tdm.monitor);
414 tpool->tdm.module->unmonitor_taskpool(tpool);
415 parsec_taskpool_free(tpool);
421 if (parsec_taskpool_started) {
423 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
424 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
426 parsec_context_wait(ctx);
428 parsec_taskpool_wait(tpool);
434 unregister_parsec_tags(
nullptr);
436 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
438 #if defined(PARSEC_PROF_TRACE)
439 if(
nullptr != profiling_array) {
440 free(profiling_array);
441 profiling_array =
nullptr;
442 profiling_array_size = 0;
445 if (own_ctx) parsec_fini(&ctx);
461 virtual void dag_on(
const std::string &filename)
override {
462 #if defined(PARSEC_PROF_GRAPHER)
463 if(!_dag_profiling) {
465 size_t len = strlen(filename.c_str())+32;
466 char ext_filename[len];
467 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
468 parsec_prof_grapher_init(ctx, ext_filename);
469 _dag_profiling =
true;
472 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
473 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
478 #if defined(PARSEC_PROF_GRAPHER)
480 parsec_prof_grapher_fini();
481 _dag_profiling =
false;
487 #if defined(PARSEC_PROF_TRACE)
488 _task_profiling =
false;
493 #if defined(PARSEC_PROF_TRACE)
494 _task_profiling =
true;
498 virtual bool profiling()
override {
return _task_profiling; }
501 return mpi_space_support[
static_cast<std::size_t
>(space)];
505 #ifdef TTG_USE_USER_TERMDET
506 if(parsec_taskpool_started) {
508 parsec_taskpool_started =
false;
513 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs = ttg::typelist<>>
515 #if defined(PARSEC_PROF_TRACE)
516 std::stringstream ss;
517 build_composite_name_rec(t->
ttg_ptr(), ss);
524 #if defined(PARSEC_PROF_TRACE)
525 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
528 build_composite_name_rec(t->
ttg_ptr(), ss);
532 void register_new_profiling_event(
const char *name,
int position) {
533 if(2*position >= profiling_array_size) {
534 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
535 profiling_array = (
int*)realloc((
void*)profiling_array,
536 new_profiling_array_size *
sizeof(int));
537 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
538 profiling_array_size = new_profiling_array_size;
539 tpool->profiling_array = profiling_array;
542 assert(0 == tpool->profiling_array[2*position]);
543 assert(0 == tpool->profiling_array[2*position+1]);
547 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
548 (
int*)&tpool->profiling_array[2*position],
549 (
int*)&tpool->profiling_array[2*position+1]);
555 if (!parsec_taskpool_started) {
556 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
560 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
562 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
564 parsec_taskpool_wait(tpool);
577 parsec_context_t *ctx =
nullptr;
578 bool own_ctx =
false;
579 parsec_taskpool_t *tpool =
nullptr;
580 bool parsec_taskpool_started =
false;
581 #if defined(PARSEC_PROF_TRACE)
582 int *profiling_array;
583 std::size_t profiling_array_size;
587 static void unregister_parsec_tags(
void *_pidx)
589 if(NULL != parsec_ce.tag_unregister) {
598 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
606 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
614 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
622 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
632 if (task ==
nullptr ||
ptr ==
nullptr) {
637 if (NULL != copy && copy->get_ptr() ==
ptr) {
647 if (task ==
nullptr ||
ptr ==
nullptr) {
652 if (NULL != copy && copy->get_ptr() ==
ptr) {
660 if (task ==
nullptr || copy ==
nullptr) {
664 if (MAX_PARAM_COUNT < task->data_count) {
665 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
677 if (copy == task->
copies[i]) {
687 task->
copies[i] =
nullptr;
691 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
692 #warning "ttg::PaRSEC enables data copy tracking"
693 static std::unordered_set<ttg_data_copy_t *> pending_copies;
694 static std::mutex pending_copies_mutex;
696 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
697 static int64_t parsec_ttg_data_copy_uid = 0;
700 template <
typename Value>
702 using value_type = std::decay_t<Value>;
705 copy =
new value_type(std::forward<Value>(value));
706 }
else if constexpr (std::is_rvalue_reference_v<decltype(value)> ||
707 std::is_copy_constructible_v<std::decay_t<Value>>) {
710 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
712 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
715 copy->size =
sizeof(Value);
716 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
718 static_cast<uint64_t
>(copy->uid),
719 PROFILE_OBJECT_ID_NULL, ©->size,
720 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
723 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
725 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
726 auto rc = pending_copies.insert(copy);
727 assert(std::get<1>(rc));
734 template <std::size_t... IS,
typename Key = keyT>
736 int junk[] = {0, (invoke_pull_terminal<IS>(
737 std::get<IS>(input_terminals), key, task),
743 template<
typename TT, std::
size_t I>
745 if constexpr(!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
751 template<
typename TT, std::size_t... Is>
754 int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->
copies[Is], device), 0)...};
758 template<
typename TT>
759 inline parsec_hook_return_t
hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
761 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
764 return me->template invoke_op<ttg::ExecutionSpace::Host>();
767 template<
typename TT>
768 inline parsec_hook_return_t
hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
771 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
773 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
774 return PARSEC_HOOK_RETURN_ERROR;
778 template<
typename TT>
779 inline parsec_hook_return_t
hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
782 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
784 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
785 return PARSEC_HOOK_RETURN_ERROR;
789 template<
typename TT>
790 inline parsec_hook_return_t
hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
793 return me->template invoke_op<ttg::ExecutionSpace::L0>();
795 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
796 return PARSEC_HOOK_RETURN_ERROR;
801 template<
typename TT>
802 inline parsec_hook_return_t
evaluate_cuda(
const parsec_task_t *parsec_task) {
805 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
807 return PARSEC_HOOK_RETURN_NEXT;
811 template<
typename TT>
812 inline parsec_hook_return_t
evaluate_hip(
const parsec_task_t *parsec_task) {
815 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
817 return PARSEC_HOOK_RETURN_NEXT;
821 template<
typename TT>
825 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
827 return PARSEC_HOOK_RETURN_NEXT;
832 template <
typename KeyT,
typename ActivationCallbackT>
834 std::vector<KeyT> _keylist;
835 std::atomic<int> _outstanding_transfers;
836 ActivationCallbackT _cb;
841 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
844 int left = --_outstanding_transfers;
846 _cb(std::move(_keylist), _copy);
853 template <
typename ActivationT>
854 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
855 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
857 parsec_ce.mem_unregister(&lreg);
858 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
859 if (activation->complete_transfer()) {
862 return PARSEC_SUCCESS;
865 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
866 int src,
void *cb_data) {
867 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
868 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
871 return PARSEC_SUCCESS;
874 template <
typename FuncT>
875 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
876 int src,
void *cb_data) {
877 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
878 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
881 return PARSEC_SUCCESS;
895 }
else if (readers == 1) {
901 if (1 == readers || readers == copy->
mutable_tag) {
902 std::atomic_thread_fence(std::memory_order_acquire);
915 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
917 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
918 size_t rc = pending_copies.erase(copy);
922 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
926 static_cast<uint64_t
>(copy->uid),
927 PROFILE_OBJECT_ID_NULL, ©->size,
928 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
936 template <
typename Value>
939 bool replace =
false;
942 assert(readers != 0);
968 }
else if (!readonly) {
989 std::atomic_thread_fence(std::memory_order_release);
1002 if (NULL == copy_res) {
1004 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1011 for (
int i = 0; i < deferred_op->
data_count; ++i) {
1012 if (deferred_op->
copies[i] == copy_in) {
1013 deferred_op->
copies[i] = new_copy;
1026 copy_res = new_copy;
1030 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() +
" but the type is not copyable");
1039 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1042 int mpi_initialized;
1043 MPI_Initialized(&mpi_initialized);
1044 if (!mpi_initialized) {
1046 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1048 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1061 for (
int i = 0; i < parsec_nb_devices; ++i) {
1062 bool is_gpu = parsec_mca_device_is_gpu(i);
1066 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1071 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1072 if (
nullptr != ttg_max_inline_cstr) {
1073 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1086 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1095 template <
typename T>
1097 world.
impl().register_ptr(
ptr);
1100 template <
typename T>
1102 world.
impl().register_ptr(std::move(
ptr));
1106 world.
impl().register_status(status_ptr);
1109 template <
typename Callback>
1111 world.
impl().register_callback(std::forward<Callback>(callback));
1117 double result = 0.0;
1118 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1123 MPI_Barrier(world.
impl().comm());
1128 template <
typename T>
1131 if (world.
rank() == source_rank) {
1134 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1136 unsigned char *buf =
new unsigned char[BUFLEN];
1137 if (world.
rank() == source_rank) {
1140 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1141 if (world.
rank() != source_rank) {
1153 parsec_task_class_t
self;
1158 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs>
1162 static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1163 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1165 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1168 static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1169 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1170 static_assert((ttg::meta::none_has_reference_v<input_valueTs>),
"Input typelist cannot contain reference types");
1171 static_assert(ttg::meta::is_none_Void_v<input_valueTs>,
"ttg::Void is for internal use only, do not use it");
1173 parsec_mempool_t mempools;
1176 template <
typename T>
1177 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1179 template <
typename T>
1180 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1182 template <
typename T>
1183 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1187 static constexpr
int numinedges = std::tuple_size_v<input_tuple_type>;
1188 static constexpr
int numins = std::tuple_size_v<actual_input_tuple_type>;
1189 static constexpr
int numouts = std::tuple_size_v<output_terminalsT>;
1190 static constexpr
int numflows = std::max(numins, numouts);
1195 if constexpr (ttg::meta::is_detected_v<have_cuda_op_non_type_t, derivedT>) {
1196 return derivedT::have_cuda_op;
1204 if constexpr (ttg::meta::is_detected_v<have_hip_op_non_type_t, derivedT>) {
1205 return derivedT::have_hip_op;
1213 if constexpr (ttg::meta::is_detected_v<have_level_zero_op_non_type_t, derivedT>) {
1214 return derivedT::have_level_zero_op;
1232 ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1234 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1239 std::tuple_size_v<input_refs_tuple_type>;
1245 template <std::
size_t i,
typename resultT,
typename InTuple>
1246 static resultT
get(InTuple &&intuple) {
1247 return static_cast<resultT
>(std::get<i>(std::forward<InTuple>(intuple)));
1249 template <std::
size_t i,
typename InTuple>
1250 static auto &
get(InTuple &&intuple) {
1251 return std::get<i>(std::forward<InTuple>(intuple));
1261 constexpr
static const size_t task_key_offset =
sizeof(task_t);
1264 output_terminalsT output_terminals;
1270 template <std::size_t... IS>
1271 static constexpr
auto make_set_args_fcts(std::index_sequence<IS...>) {
1272 using resultT = decltype(set_arg_from_msg_fcts);
1273 return resultT{{&TT::set_arg_from_msg<IS>...}};
1275 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_arg_from_msg_fcts =
1276 make_set_args_fcts(std::make_index_sequence<numins>{});
1278 template <std::size_t... IS>
1279 static constexpr
auto make_set_size_fcts(std::index_sequence<IS...>) {
1280 using resultT = decltype(set_argstream_size_from_msg_fcts);
1281 return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1283 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1284 make_set_size_fcts(std::make_index_sequence<numins>{});
1286 template <std::size_t... IS>
1287 static constexpr
auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1288 using resultT = decltype(finalize_argstream_from_msg_fcts);
1289 return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1291 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1292 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1294 template <std::size_t... IS>
1295 static constexpr
auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1296 using resultT = decltype(get_from_pull_msg_fcts);
1297 return resultT{{&TT::get_from_pull_msg<IS>...}};
1299 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1300 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1302 template<std::size_t... IS>
1303 constexpr
static auto make_input_is_const(std::index_sequence<IS...>) {
1304 using resultT = decltype(input_is_const);
1305 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1307 constexpr
static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1310 ttg::meta::detail::keymap_t<keyT> keymap;
1311 ttg::meta::detail::keymap_t<keyT> priomap;
1312 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1314 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1316 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1317 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1318 int num_pullins = 0;
1330 auto op(Args &&...args) {
1331 derivedT *derived =
static_cast<derivedT *
>(
this);
1333 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1334 if constexpr (std::is_same_v<return_type,void>) {
1335 derived->op(std::forward<Args>(args)...);
1339 return derived->op(std::forward<Args>(args)...);
1343 template <std::
size_t i,
typename terminalT,
typename Key>
1344 void invoke_pull_terminal(terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1345 if (
in.is_pull_terminal) {
1346 auto owner =
in.container.owner(key);
1347 if (owner != world.rank()) {
1348 get_pull_terminal_data_from<i>(owner, key);
1351 set_arg<i>(key, (
in.container).get(key));
1356 template <std::
size_t i,
typename Key>
1357 void get_pull_terminal_data_from(
const int owner,
1359 using msg_t = detail::msg_t;
1360 auto &world_impl = world.impl();
1361 parsec_taskpool_t *tp = world_impl.taskpool();
1362 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), tp->taskpool_id,
1367 pos =
pack(key, msg->bytes, pos);
1368 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1369 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1370 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
1371 sizeof(msg_header_t) + pos);
1374 template <std::size_t... IS,
typename Key = keyT>
1375 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1376 int junk[] = {0, (invoke_pull_terminal<IS>(
1377 std::get<IS>(input_terminals), key, task),
1382 template <std::size_t... IS>
1383 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1385 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1386 task->copies[IS]->get_ptr()))...};
1389 #ifdef TTG_HAVE_DEVICE
1393 template <ttg::ExecutionSpace Space>
1394 static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1395 parsec_gpu_task_t *gpu_task,
1396 parsec_gpu_exec_stream_t *gpu_stream) {
1398 task_t *task = (task_t*)gpu_task->ec;
1400 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1402 task->dev_ptr->stream = gpu_stream;
1407 auto dev_data = dev_task.promise();
1410 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1411 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1413 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1415 parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1417 ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1421 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1423 parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1425 ttg::device::detail::set_current(device, hip_stream->hip_stream);
1429 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1431 parsec_level_zero_exec_stream_t *stream;
1432 stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1434 ttg::device::detail::set_current(device, stream->swq->queue);
1439 static_op<Space>(&task->parsec_task);
1441 ttg::device::detail::reset_current();
1444 int rc = PARSEC_HOOK_RETURN_DONE;
1445 if (
nullptr != task->suspended_task_address) {
1447 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1448 dev_data = dev_task.promise();
1450 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1451 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1452 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1454 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1455 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1460 rc = PARSEC_HOOK_RETURN_AGAIN;
1470 static_device_stage_in(parsec_gpu_task_t *gtask,
1472 parsec_gpu_exec_stream_t *gpu_stream) {
1474 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1475 if (flow_mask & (1<<i)) {
1476 task_t *task = (task_t*)gtask->ec;
1477 parsec_data_copy_t *copy = task->parsec_task.data[i].data_in;
1479 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1492 static_device_stage_in_hook(parsec_gpu_task_t *gtask,
1494 parsec_gpu_exec_stream_t *gpu_stream) {
1495 static_device_stage_in(gtask, flow_mask, gpu_stream);
1496 return parsec_default_gpu_stage_in(gtask, flow_mask, gpu_stream);
1499 template <ttg::ExecutionSpace Space>
1500 static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1502 task_t *task = (task_t*)parsec_task;
1503 if (task->dev_ptr->gpu_task ==
nullptr) {
1506 parsec_gpu_task_t *gpu_task;
1508 gpu_task =
static_cast<parsec_gpu_task_t*
>(std::calloc(1,
sizeof(*gpu_task)));
1509 PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1510 gpu_task->ec = parsec_task;
1511 gpu_task->task_type = 0;
1512 gpu_task->last_data_check_epoch = -1;
1513 gpu_task->pushout = 0;
1514 gpu_task->submit = &TT::device_static_submit<Space>;
1523 task->dev_ptr->gpu_task = gpu_task;
1526 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1529 task->dev_ptr->task_class = *task->parsec_task.task_class;
1532 static_op<Space>(parsec_task);
1536 parsec_task_class_t& tc = task->dev_ptr->task_class;
1539 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1540 tc.in[i] = gpu_task->flow[i];
1541 tc.out[i] = gpu_task->flow[i];
1543 tc.nb_flows = MAX_PARAM_COUNT;
1547 if (tt->devicemap) {
1549 if constexpr (std::is_void_v<keyT>) {
1554 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1556 if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1557 parsec_data_t *
data = parsec_task->data[i].data_in->original;
1561 if (
data->owner_device == 0) {
1562 parsec_advise_data_on_device(
data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1569 task->parsec_task.task_class = &task->dev_ptr->task_class;
1572 return PARSEC_HOOK_RETURN_DONE;
1575 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1578 return PARSEC_HOOK_RETURN_ERROR;
1582 template <ttg::ExecutionSpace Space>
1583 static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1588 task_t *task = (task_t*)parsec_task;
1590 if (
nullptr == task->suspended_task_address) {
1592 return PARSEC_HOOK_RETURN_DONE;
1596 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1599 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1602 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER);
1604 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1605 assert(NULL != device);
1607 task->dev_ptr->device = device;
1608 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1609 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1611 switch(device->super.type) {
1613 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1614 case PARSEC_DEV_CUDA:
1618 gpu_task->stage_in = static_device_stage_in_hook;
1619 gpu_task->stage_out = parsec_default_gpu_stage_out;
1620 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1624 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1625 case PARSEC_DEV_HIP:
1627 gpu_task->stage_in = static_device_stage_in_hook;
1628 gpu_task->stage_out = parsec_default_gpu_stage_out;
1629 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1633 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1634 case PARSEC_DEV_LEVEL_ZERO:
1636 gpu_task->stage_in = static_device_stage_in_hook;
1637 gpu_task->stage_out = parsec_default_gpu_stage_out;
1638 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1645 ttg::print_error(task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1647 return PARSEC_HOOK_RETURN_DONE;
1651 template <ttg::ExecutionSpace Space>
1652 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1654 task_t *task = (task_t*)parsec_task;
1655 void* suspended_task_address =
1656 #ifdef TTG_HAVE_COROUTINE
1657 task->suspended_task_address;
1662 if (suspended_task_address ==
nullptr) {
1664 ttT *baseobj = task->tt;
1665 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1668 if (obj->tracing()) {
1669 if constexpr (!ttg::meta::is_void_v<keyT>)
1670 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": executing");
1672 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : executing");
1675 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1676 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1677 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, std::move(input), obj->output_terminals));
1678 }
else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1679 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1680 }
else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1681 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1682 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(std::move(input), obj->output_terminals));
1683 }
else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1684 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1692 #ifdef TTG_HAVE_COROUTINE
1695 #ifdef TTG_HAVE_DEVICE
1697 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1701 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1702 task->tt->set_outputs_tls_ptr();
1704 if (coro.completed()) {
1706 suspended_task_address =
nullptr;
1708 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1714 assert(ret.ready());
1715 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1716 task->tt->set_outputs_tls_ptr();
1718 if (ret.completed()) {
1720 suspended_task_address =
nullptr;
1728 for (
auto &event_ptr : events) {
1729 event_ptr->finish();
1733 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1735 task->suspended_task_address = suspended_task_address;
1743 #ifdef TTG_HAVE_COROUTINE
1744 task->suspended_task_address = suspended_task_address;
1746 if (suspended_task_address ==
nullptr) {
1747 ttT *baseobj = task->tt;
1748 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1749 if (obj->tracing()) {
1750 if constexpr (!ttg::meta::is_void_v<keyT>)
1751 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": done executing");
1753 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1757 return PARSEC_HOOK_RETURN_DONE;
1760 template <ttg::ExecutionSpace Space>
1761 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1762 task_t *task =
static_cast<task_t*
>(parsec_task);
1764 void* suspended_task_address =
1765 #ifdef TTG_HAVE_COROUTINE
1766 task->suspended_task_address;
1770 if (suspended_task_address ==
nullptr) {
1771 ttT *baseobj = (
ttT *)task->object_ptr;
1772 derivedT *obj = (derivedT *)task->object_ptr;
1775 if constexpr (!ttg::meta::is_void_v<keyT>) {
1776 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1777 }
else if constexpr (ttg::meta::is_void_v<keyT>) {
1778 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1784 #ifdef TTG_HAVE_COROUTINE
1786 assert(ret.ready());
1788 if (ret.completed()) {
1790 suspended_task_address =
nullptr;
1799 task->suspended_task_address = suspended_task_address;
1801 if (suspended_task_address) {
1804 return PARSEC_HOOK_RETURN_AGAIN;
1807 return PARSEC_HOOK_RETURN_DONE;
1810 template <std::
size_t i>
1811 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1812 using rtask_t = detail::reducer_task_t;
1813 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1814 constexpr
const bool val_is_void = ttg::meta::is_void_v<value_t>;
1815 constexpr
const bool input_is_const = std::is_const_v<value_t>;
1816 rtask_t *rtask = (rtask_t*)parsec_task;
1817 task_t *parent_task =
static_cast<task_t*
>(rtask->parent_task);
1818 ttT *baseobj = parent_task->tt;
1819 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1821 auto& reducer = std::get<i>(baseobj->input_reducers);
1825 if (obj->tracing()) {
1826 if constexpr (!ttg::meta::is_void_v<keyT>)
1827 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1829 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : reducer executing");
1833 detail::ttg_data_copy_t *target_copy;
1834 target_copy = parent_task->copies[i];
1835 assert(val_is_void ||
nullptr != target_copy);
1838 std::size_t
size = 0;
1839 assert(parent_task->streams[i].reduce_count > 0);
1840 if (rtask->is_first) {
1841 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1843 if (obj->tracing()) {
1844 if constexpr (!ttg::meta::is_void_v<keyT>)
1845 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1847 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : first reducer empty");
1850 return PARSEC_HOOK_RETURN_DONE;
1858 if constexpr(!val_is_void) {
1860 detail::ttg_data_copy_t *source_copy;
1861 parsec_list_item_t *item;
1862 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1863 if (
nullptr == item) {
1867 source_copy = ((detail::ttg_data_copy_self_t *)(item))->
self;
1868 assert(target_copy->num_readers() == target_copy->mutable_tag);
1869 assert(source_copy->num_readers() > 0);
1870 reducer(*
reinterpret_cast<std::decay_t<value_t> *
>(target_copy->get_ptr()),
1871 *
reinterpret_cast<std::decay_t<value_t> *
>(source_copy->get_ptr()));
1873 }
else if constexpr(val_is_void) {
1877 size = ++parent_task->streams[i].size;
1879 }
while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1883 bool complete = (
size >= parent_task->streams[i].goal);
1888 if (complete && c == 0) {
1889 if constexpr(input_is_const) {
1891 target_copy->reset_readers();
1894 parent_task->remove_from_hash =
true;
1895 parent_task->release_task(parent_task);
1900 if (obj->tracing()) {
1901 if constexpr (!ttg::meta::is_void_v<keyT>)
1902 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": done executing");
1904 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1907 return PARSEC_HOOK_RETURN_DONE;
1912 template <
typename T>
1913 uint64_t
unpack(T &obj,
void *_bytes, uint64_t pos) {
1915 uint64_t payload_size;
1916 if constexpr (!dd_t::serialize_size_is_const) {
1919 payload_size = dd_t::payload_size(&obj);
1921 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1925 template <
typename T>
1928 uint64_t payload_size = dd_t::payload_size(&obj);
1931 copy->iovec_reset();
1934 if constexpr (!dd_t::serialize_size_is_const) {
1937 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1943 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1945 derivedT *obj =
reinterpret_cast<derivedT *
>(bop);
1946 switch (hd->
fn_id) {
1950 assert(hd->
param_id < obj->set_arg_from_msg_fcts.size());
1951 auto member = obj->set_arg_from_msg_fcts[hd->
param_id];
1961 assert(hd->
param_id < obj->set_argstream_size_from_msg_fcts.size());
1962 auto member = obj->set_argstream_size_from_msg_fcts[hd->
param_id];
1968 assert(hd->
param_id < obj->finalize_argstream_from_msg_fcts.size());
1969 auto member = obj->finalize_argstream_from_msg_fcts[hd->
param_id];
1975 assert(hd->
param_id < obj->get_from_pull_msg_fcts.size());
1976 auto member = obj->get_from_pull_msg_fcts[hd->
param_id];
1987 auto &world_impl = world.impl();
1988 parsec_execution_stream_s *es = world_impl.execution_stream();
1989 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1990 return &mempools.thread_mempools[index];
1993 template <
size_t i,
typename valueT>
1997 parsec_execution_stream_s *es = world.impl().execution_stream();
1999 dummy =
new (parsec_thread_mempool_allocate(mempool))
task_t(mempool, &this->
self);
2007 dummy->
parsec_task.taskpool = world.impl().taskpool();
2014 parsec_task_t *task_ring =
nullptr;
2015 for (
auto &&key : keylist) {
2017 if constexpr (std::is_copy_constructible_v<valueT>) {
2018 set_arg_local_impl<i>(key, *
reinterpret_cast<valueT *
>(copy->
get_ptr()), copy, &task_ring);
2022 static_assert(!std::is_reference_v<valueT>);
2024 set_arg_local_impl<i>(key, std::move(*
reinterpret_cast<valueT *
>(copy->
get_ptr())), copy, &task_ring);
2026 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() +
" but the type is not copyable");
2031 if (
nullptr != task_ring) {
2032 auto &world_impl = world.impl();
2033 parsec_task_t *vp_task_ring[1] = { task_ring };
2034 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2041 complete_task_and_release(es, &dummy->
parsec_task);
2042 parsec_thread_mempool_free(mempool, &dummy->
parsec_task);
2054 template <std::
size_t i>
2056 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2058 msg_t *msg =
static_cast<msg_t *
>(
data);
2059 if constexpr (!ttg::meta::is_void_v<keyT>) {
2063 uint64_t key_end_pos;
2064 std::vector<keyT> keylist;
2065 int num_keys = msg->tt_id.num_keys;
2066 keylist.reserve(num_keys);
2067 auto rank = world.rank();
2068 for (
int k = 0; k < num_keys; ++k) {
2070 pos =
unpack(key, msg->bytes, pos);
2071 assert(keymap(key) ==
rank);
2072 keylist.push_back(std::move(key));
2078 if constexpr (!ttg::meta::is_void_v<valueT>) {
2079 using decvalueT = std::decay_t<valueT>;
2080 int32_t num_iovecs = msg->tt_id.num_iovecs;
2085 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2088 metadata_t metadata;
2089 pos =
unpack(metadata, msg->bytes, pos);
2098 parsec_gpu_data_copy_t* gpu_elem;
2099 gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2102 while (i < parsec_nb_devices) {
2103 if (
nullptr == gpu_elem) {
2104 gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2105 gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2106 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2107 gpu_elem->version = 0;
2108 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2110 if (
nullptr == gpu_elem->device_private) {
2111 gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2112 if (
nullptr == gpu_elem->device_private) {
2121 pos =
unpack(*
static_cast<decvalueT *
>(copy->
get_ptr()), msg->bytes, pos);
2126 if (num_iovecs == 0) {
2127 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2132 int remote = msg->tt_id.sender;
2133 assert(remote < world.size());
2135 auto &val = *
static_cast<decvalueT *
>(copy->
get_ptr());
2137 bool inline_data = msg->tt_id.inline_data;
2141 auto handle_iovecs_fn =
2142 [&](
auto&& iovecs) {
2146 for (
auto &&iov : iovecs) {
2148 std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2149 pos += iov.num_bytes;
2153 parsec_ce_tag_t cbtag;
2154 std::memcpy(&cbtag, msg->bytes + pos,
sizeof(cbtag));
2155 pos +=
sizeof(cbtag);
2160 set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2161 this->world.impl().decrement_inflight_msg();
2164 using ActivationT = std::decay_t<decltype(*activation)>;
2166 for (
auto &&iov : iovecs) {
2168 parsec_ce_mem_reg_handle_t rreg;
2169 int32_t rreg_size_i;
2170 std::memcpy(&rreg_size_i, msg->bytes + pos,
sizeof(rreg_size_i));
2171 pos +=
sizeof(rreg_size_i);
2172 rreg =
static_cast<parsec_ce_mem_reg_handle_t
>(msg->bytes + pos);
2176 std::intptr_t fn_ptr;
2177 std::memcpy(&fn_ptr, msg->bytes + pos,
sizeof(fn_ptr));
2178 pos +=
sizeof(fn_ptr);
2181 parsec_ce_mem_reg_handle_t lreg;
2183 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2184 iov.num_bytes, &lreg, &lreg_size);
2185 world.impl().increment_inflight_msg();
2188 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2189 &detail::get_complete_cb<ActivationT>, activation,
2191 cbtag, &fn_ptr,
sizeof(std::intptr_t));
2197 handle_iovecs_fn(descr.get_data(val));
2203 assert(num_iovecs == nv);
2207 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2211 }
else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2212 for (
auto &&key : keylist) {
2213 set_arg<i, keyT, ttg::Void>(key,
ttg::Void{});
2217 }
else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2218 using decvalueT = std::decay_t<valueT>;
2221 unpack(val, msg->bytes, 0);
2222 set_arg<i, keyT, valueT>(std::move(val));
2224 }
else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2225 set_arg<i, keyT, ttg::Void>(
ttg::Void{});
2231 template <std::
size_t i>
2234 msg_t *msg =
static_cast<msg_t *
>(
data);
2235 if constexpr (!ttg::meta::is_void_v<keyT>) {
2238 auto rank = world.rank();
2240 pos =
unpack(key, msg->bytes, pos);
2241 assert(keymap(key) ==
rank);
2242 finalize_argstream<i>(key);
2244 auto rank = world.rank();
2245 assert(keymap() ==
rank);
2246 finalize_argstream<i>();
2250 template <std::
size_t i>
2253 auto msg =
static_cast<msg_t *
>(
data);
2255 if constexpr (!ttg::meta::is_void_v<keyT>) {
2257 auto rank = world.rank();
2259 pos =
unpack(key, msg->bytes, pos);
2260 assert(keymap(key) ==
rank);
2261 std::size_t argstream_size;
2262 pos =
unpack(argstream_size, msg->bytes, pos);
2263 set_argstream_size<i>(key, argstream_size);
2265 auto rank = world.rank();
2266 assert(keymap() ==
rank);
2267 std::size_t argstream_size;
2268 pos =
unpack(argstream_size, msg->bytes, pos);
2269 set_argstream_size<i>(argstream_size);
2273 template <std::
size_t i>
2276 msg_t *msg =
static_cast<msg_t *
>(
data);
2277 auto &
in = std::get<i>(input_terminals);
2278 if constexpr (!ttg::meta::is_void_v<keyT>) {
2282 pos =
unpack(key, msg->bytes, pos);
2283 set_arg<i>(key, (
in.container).get(key));
2287 template <std::
size_t i,
typename Key,
typename Value>
2288 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2289 const Key &key, Value &&value) {
2290 set_arg_local_impl<i>(key, std::forward<Value>(value));
2293 template <std::
size_t i,
typename Key = keyT,
typename Value>
2294 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2296 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2299 template <std::
size_t i,
typename Key,
typename Value>
2300 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2301 const Key &key,
const Value &value) {
2302 set_arg_local_impl<i>(key, value);
2305 template <std::
size_t i,
typename Key = keyT,
typename Value>
2306 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2307 const Value &value) {
2308 set_arg_local_impl<i>(
ttg::Void{}, value);
2311 template <std::
size_t i,
typename Key = keyT,
typename Value>
2312 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2313 std::shared_ptr<const Value> &valueptr) {
2314 set_arg_local_impl<i>(
ttg::Void{}, *valueptr);
2317 template <
typename Key>
2319 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2320 auto &world_impl = world.impl();
2323 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2324 int32_t priority = 0;
2325 if constexpr (!keyT_is_Void) {
2326 priority = priomap(key);
2328 newtask =
new (taskobj)
task_t(key, mempool, &this->
self, world_impl.taskpool(),
this, priority);
2330 priority = priomap();
2332 newtask =
new (taskobj)
task_t(mempool, &this->
self, world_impl.taskpool(),
this, priority);
2335 for (
int i = 0; i < static_stream_goal.size(); ++i) {
2336 newtask->
streams[i].goal = static_stream_goal[i];
2344 template <std::
size_t i>
2348 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2349 auto &world_impl = world.impl();
2352 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2354 int32_t priority = 0;
2355 if constexpr (!keyT_is_Void) {
2356 priority = priomap(task->
key);
2359 priority = priomap();
2364 world_impl.taskpool(), priority, is_first);
2371 template <std::
size_t i,
typename Key,
typename Value>
2373 parsec_task_t **task_ring =
nullptr) {
2374 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2375 constexpr
const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2376 constexpr
const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2377 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2380 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ", i);
2382 parsec_key_t hk = 0;
2383 if constexpr (!keyT_is_Void) {
2384 hk =
reinterpret_cast<parsec_key_t
>(&key);
2385 assert(keymap(key) == world.rank());
2389 auto &world_impl = world.impl();
2390 auto &reducer = std::get<i>(input_reducers);
2392 bool remove_from_hash =
true;
2393 bool discover_task =
true;
2394 bool get_pull_data =
false;
2395 bool has_lock =
false;
2397 if (numins > 1 || reducer) {
2400 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
2402 world_impl.increment_created();
2405 if( world_impl.dag_profiling() ) {
2406 #if defined(PARSEC_PROF_GRAPHER)
2407 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2411 }
else if (!reducer && numins == (task->
in_data_count + 1)) {
2413 parsec_hash_table_nolock_remove(&
tasks_table, hk);
2414 remove_from_hash =
false;
2418 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2423 world_impl.increment_created();
2424 remove_from_hash =
false;
2425 if( world_impl.dag_profiling() ) {
2426 #if defined(PARSEC_PROF_GRAPHER)
2427 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2433 if( world_impl.dag_profiling() ) {
2434 #if defined(PARSEC_PROF_GRAPHER)
2439 if(orig_index >= 0) {
2440 snprintf(orig_str, 32,
"%d", orig_index);
2442 strncpy(orig_str,
"_", 32);
2444 snprintf(dest_str, 32,
"%lu", i);
2445 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2446 .flow_index = 0, .flow_datatype_mask = ~0 };
2447 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2448 .flow_index = 0, .flow_datatype_mask = ~0 };
2459 if (
nullptr != copy) {
2461 copy = detail::register_data_copy<valueT>(copy, task,
is_const);
2472 if (reducer && 1 != task->
streams[i].goal) {
2473 auto submit_reducer_task = [&](
auto *parent_task){
2475 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2480 reduce_task = create_new_reducer_task<i>(parent_task,
false);
2485 if constexpr (!ttg::meta::is_void_v<valueT>) {
2488 if (
nullptr == (copy = task->
copies[i])) {
2489 using decay_valueT = std::decay_t<valueT>;
2494 reduce_task = create_new_reducer_task<i>(task,
true);
2498 task->
streams[i].reduce_count.store(1, std::memory_order_relaxed);
2514 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2517 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2523 parsec_lifo_push(&task->
streams[i].reduce_copies, ©->
super);
2524 submit_reducer_task(task);
2528 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2530 submit_reducer_task(task);
2540 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2543 if constexpr (!valueT_is_Void) {
2544 if (
nullptr != task->
copies[i]) {
2546 throw std::logic_error(
"bad set arg");
2566 if constexpr (!ttg::meta::is_void_v<keyT>) {
2567 if (get_pull_data) {
2574 parsec_task_t **task_ring =
nullptr) {
2575 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2584 count = parsec_atomic_fetch_inc_int32(&task->
in_data_count) + 1;
2585 assert(count <=
self.dependencies_goal);
2588 auto &world_impl = world.impl();
2589 ttT *baseobj = task->
tt;
2591 if (count == numins) {
2592 parsec_execution_stream_t *es = world_impl.execution_stream();
2593 parsec_key_t hk = task->
pkey();
2595 if constexpr (!keyT_is_Void) {
2602 if (
nullptr == task_ring) {
2603 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2604 __parsec_schedule_vp(es, vp_task_rings, 0);
2605 }
else if (*task_ring ==
nullptr) {
2610 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->
parsec_task.super,
2611 offsetof(parsec_task_t, priority));
2613 }
else if constexpr (!ttg::meta::is_void_v<keyT>) {
2614 if ((baseobj->num_pullins + count == numins) && baseobj->
is_lazy_pull()) {
2622 template <std::
size_t i,
typename Key,
typename Value>
2623 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2625 set_arg_impl<i>(key, std::forward<Value>(value));
2629 template <std::
size_t i,
typename Key,
typename Value>
2630 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(Value &&value) {
2631 set_arg_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2634 template <std::
size_t i,
typename Key = keyT>
2635 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2640 template <std::
size_t i,
typename Key>
2641 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2645 template<
typename Value,
typename Key>
2647 using decvalueT = std::decay_t<Value>;
2648 bool inline_data =
false;
2650 std::size_t iov_size = 0;
2651 std::size_t metadata_size = 0;
2654 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2655 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2656 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2657 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2663 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2667 std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2675 template <std::
size_t i,
typename Key,
typename Value>
2678 using decvalueT = std::decay_t<Value>;
2679 using norefvalueT = std::remove_reference_t<Value>;
2680 norefvalueT *value_ptr = &value;
2682 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2683 if(world.impl().profiling()) {
2684 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2688 if constexpr (!ttg::meta::is_void_v<Key>)
2689 owner = keymap(key);
2692 if (owner == world.rank()) {
2693 if constexpr (!ttg::meta::is_void_v<keyT>)
2694 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2696 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value), copy_in);
2697 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2698 if(world.impl().profiling()) {
2699 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2708 auto &world_impl = world.impl();
2711 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2714 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2718 if (
nullptr == copy) {
2720 if (
nullptr == copy) {
2724 value_ptr =
static_cast<norefvalueT*
>(copy->
get_ptr());
2729 msg->tt_id.inline_data = inline_data;
2731 auto handle_iovec_fn = [&](
auto&& iovecs){
2735 for (
auto &&iov : iovecs) {
2736 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2737 pos += iov.num_bytes;
2744 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2745 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2746 pos +=
sizeof(cbtag);
2752 for (
auto &&iov : iovecs) {
2753 copy = detail::register_data_copy<decvalueT>(copy,
nullptr,
true);
2754 parsec_ce_mem_reg_handle_t lreg;
2757 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2758 iov.num_bytes, &lreg, &lreg_size);
2759 auto lreg_ptr = std::shared_ptr<void>{lreg, [](
void *
ptr) {
2760 parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)
ptr;
2761 parsec_ce.mem_unregister(&memreg);
2763 int32_t lreg_size_i = lreg_size;
2764 std::memcpy(msg->bytes + pos, &lreg_size_i,
sizeof(lreg_size_i));
2765 pos +=
sizeof(lreg_size_i);
2766 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2770 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2776 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2777 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
2778 pos +=
sizeof(fn_ptr);
2785 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2786 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2788 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2789 size_t metadata_size =
sizeof(metadata);
2790 pos =
pack(metadata, msg->bytes, pos);
2792 handle_iovec_fn(iovs);
2796 pos =
pack(*value_ptr, msg->bytes, pos, copy);
2804 msg->tt_id.num_iovecs = num_iovecs;
2808 msg->tt_id.num_keys = 0;
2809 msg->tt_id.key_offset = pos;
2810 if constexpr (!ttg::meta::is_void_v<Key>) {
2811 size_t tmppos =
pack(key, msg->bytes, pos);
2813 msg->tt_id.num_keys = 1;
2816 parsec_taskpool_t *tp = world_impl.taskpool();
2817 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2818 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2820 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
2822 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2823 if(world.impl().profiling()) {
2824 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2827 #if defined(PARSEC_PROF_GRAPHER)
2832 if(orig_index >= 0) {
2833 snprintf(orig_str, 32,
"%d", orig_index);
2835 strncpy(orig_str,
"_", 32);
2837 snprintf(dest_str, 32,
"%lu", i);
2838 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2839 .flow_index = 0, .flow_datatype_mask = ~0 };
2840 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2841 .flow_index = 0, .flow_datatype_mask = ~0 };
2849 template <
int i,
typename Iterator,
typename Value>
2851 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2852 if(world.impl().profiling()) {
2853 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2856 parsec_task_t *task_ring =
nullptr;
2862 for (
auto it = begin; it != end; ++it) {
2863 set_arg_local_impl<i>(*it, value, copy, &task_ring);
2866 if (
nullptr != task_ring) {
2867 parsec_task_t *vp_task_ring[1] = { task_ring };
2868 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2870 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2871 if(world.impl().profiling()) {
2872 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2877 template <std::
size_t i,
typename Key,
typename Value>
2878 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2881 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2883 auto np = world.size();
2884 int rank = world.rank();
2886 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2887 [&](
const Key &key) { return keymap(key) != rank; });
2890 using decvalueT = std::decay_t<Value>;
2893 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2894 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](
const Key &a,
const Key &b)
mutable {
2895 int rank_a = keymap(a);
2896 int rank_b = keymap(b);
2898 int pos_a = (rank_a + np - rank) % np;
2899 int pos_b = (rank_b + np - rank) % np;
2900 return pos_a < pos_b;
2904 auto local_begin = keylist_sorted.end();
2905 auto local_end = keylist_sorted.end();
2907 int32_t num_iovs = 0;
2911 assert(
nullptr != copy);
2914 auto &world_impl = world.impl();
2915 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2920 bool inline_data =
can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
2921 msg->tt_id.inline_data = inline_data;
2923 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
2924 auto handle_iovs_fn = [&](
auto&& iovs){
2928 for (
auto &&iov : iovs) {
2929 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2930 pos += iov.num_bytes;
2937 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2938 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2939 pos +=
sizeof(cbtag);
2941 for (
auto &&iov : iovs) {
2942 parsec_ce_mem_reg_handle_t lreg;
2944 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2945 iov.num_bytes, &lreg, &lreg_size);
2947 memregs.push_back(std::make_pair(
static_cast<int32_t
>(lreg_size),
2949 std::shared_ptr<void>{lreg, [](
void *
ptr) {
2950 parsec_ce_mem_reg_handle_t memreg =
2951 (parsec_ce_mem_reg_handle_t)
ptr;
2953 parsec_ce.mem_unregister(&memreg);
2963 auto metadata = descr.get_metadata(value);
2964 size_t metadata_size =
sizeof(metadata);
2965 pos =
pack(metadata, msg->bytes, pos);
2966 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(&value));
2967 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
2968 memregs.reserve(num_iovs);
2969 handle_iovs_fn(iovs);
2973 pos =
pack(value, msg->bytes, pos, copy);
2979 msg->tt_id.num_iovecs = num_iovs;
2981 std::size_t save_pos = pos;
2983 parsec_taskpool_t *tp = world_impl.taskpool();
2984 for (
auto it = keylist_sorted.begin(); it < keylist_sorted.end(); ) {
2986 auto owner = keymap(*it);
2987 if (owner ==
rank) {
2991 std::find_if_not(++it, keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3004 for (
int idx = 0; idx < num_iovs; ++idx) {
3007 std::shared_ptr<void> lreg_ptr;
3008 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3009 std::memcpy(msg->bytes + pos, &lreg_size,
sizeof(lreg_size));
3010 pos +=
sizeof(lreg_size);
3011 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3015 copy = detail::register_data_copy<valueT>(copy,
nullptr,
true);
3017 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3023 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3024 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
3025 pos +=
sizeof(fn_ptr);
3030 msg->tt_id.key_offset = pos;
3036 pos =
pack(*it, msg->bytes, pos);
3038 }
while (it < keylist_sorted.end() && keymap(*it) == owner);
3039 msg->tt_id.num_keys = num_keys;
3041 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3042 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3044 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3048 broadcast_arg_local<i>(local_begin, local_end, value);
3051 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3058 template <
typename Key,
typename... Ts,
size_t... Is,
size_t... Js>
3059 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3060 std::index_sequence<Js...>,
const Key &key,
3061 const std::tuple<Ts...> &args) {
3062 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3063 constexpr
size_t js[] = {Js...};
3064 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3070 template <
typename Key,
typename... Ts,
size_t... Is>
3071 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
const Key &key,
3072 const std::tuple<Ts...> &args) {
3073 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3079 template <
typename Key = keyT,
typename... Ts,
size_t... Is,
size_t... Js>
3080 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3081 const std::tuple<Ts...> &args) {
3082 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3083 constexpr
size_t js[] = {Js...};
3084 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3090 template <
typename Key = keyT,
typename... Ts,
size_t... Is>
3091 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
3092 const std::tuple<Ts...> &args) {
3093 set_args(std::index_sequence_for<Ts...>{}, is, args);
3099 template <std::
size_t i>
3101 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3102 assert(
size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3104 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ", i);
3107 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3109 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3112 static_stream_goal[i] =
size;
3118 template <std::
size_t i,
typename Key>
3121 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3122 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3125 const auto owner = keymap(key);
3126 if (owner != world.rank()) {
3127 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ", i);
3129 auto &world_impl = world.impl();
3131 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3133 world_impl.rank(), 1);
3135 pos =
pack(key, msg->bytes, pos);
3137 parsec_taskpool_t *tp = world_impl.taskpool();
3138 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3139 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3140 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3143 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ",
size,
" for terminal ", i);
3145 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3148 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3150 world.impl().increment_created();
3152 if( world.impl().dag_profiling() ) {
3153 #if defined(PARSEC_PROF_GRAPHER)
3154 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3158 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3168 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3170 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3179 template <std::
size_t i,
typename Key = keyT>
3182 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3183 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3186 const auto owner = keymap();
3187 if (owner != world.rank()) {
3188 ttg::trace(world.rank(),
":",
get_name(),
" : forwarding stream size for terminal ", i);
3190 auto &world_impl = world.impl();
3192 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3194 world_impl.rank(), 0);
3196 parsec_taskpool_t *tp = world_impl.taskpool();
3197 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3198 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3199 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3204 parsec_key_t hk = 0;
3207 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3209 world.impl().increment_created();
3211 if( world.impl().dag_profiling() ) {
3212 #if defined(PARSEC_PROF_GRAPHER)
3213 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3217 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3227 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3229 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3237 template <std::
size_t i,
typename Key>
3240 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3243 const auto owner = keymap(key);
3244 if (owner != world.rank()) {
3245 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ", i);
3247 auto &world_impl = world.impl();
3249 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3251 world_impl.rank(), 1);
3253 pos =
pack(key, msg->bytes, pos);
3254 parsec_taskpool_t *tp = world_impl.taskpool();
3255 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3256 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3257 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3260 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ", i);
3262 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3267 " : error finalize called on stream that never received an input data: ", i);
3268 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3279 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3281 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3282 if (1 == c && (task->
streams[i].size >= 1)) {
3289 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3292 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3295 const auto owner = keymap();
3296 if (owner != world.rank()) {
3297 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ", i);
3299 auto &world_impl = world.impl();
3301 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3303 world_impl.rank(), 0);
3304 parsec_taskpool_t *tp = world_impl.taskpool();
3305 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3306 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3307 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3312 auto hk =
static_cast<parsec_key_t
>(0);
3316 " : error finalize called on stream that never received an input data: ", i);
3317 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3328 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3330 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3331 if (1 == c && (task->
streams[i].size >= 1)) {
3341 auto check_parsec_data = [&](parsec_data_t*
data) {
3342 if (
data->owner_device != 0) {
3345 while (flowidx < MAX_PARAM_COUNT &&
3346 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3353 if (flowidx == MAX_PARAM_COUNT) {
3354 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3356 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3359 gpu_task->flow_nb_elts[flowidx] =
data->nb_elts;
3362 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3363 gpu_task->pushout |= 1<<flowidx;
3371 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3372 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3375 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3376 static constexpr
const bool value_is_const = std::is_const_v<valueT>;
3383 if (
nullptr == copy) {
3388 bool need_pushout =
false;
3396 auto &reducer = std::get<i>(input_reducers);
3404 if constexpr (value_is_const) {
3408 need_pushout =
true;
3418 need_pushout =
true;
3425 need_pushout =
true;
3429 need_pushout =
true;
3436 need_pushout =
true;
3440 if (!need_pushout) {
3441 bool device_supported =
false;
3451 if (!device_supported) {
3452 need_pushout = remote_check();
3463 template <std::
size_t i,
typename Key,
typename Value>
3464 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3467 auto remote_check = [&](){
3469 int rank = world.rank();
3470 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3471 [&](
const Key &key) { return keymap(key) != rank; });
3474 do_prepare_send<i>(value, remote_check);
3477 template <std::
size_t i,
typename Key,
typename Value>
3478 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3481 auto remote_check = [&](){
3483 int rank = world.rank();
3484 return (keymap() !=
rank);
3486 do_prepare_send<i>(value, remote_check);
3499 TT(
const TT &other) =
delete;
3500 TT &operator=(
const TT &other) =
delete;
3501 TT(
TT &&other) =
delete;
3502 TT &operator=(
TT &&other) =
delete;
3505 template <
typename terminalT, std::
size_t i>
3506 void register_input_callback(terminalT &input) {
3507 using valueT =
typename terminalT::value_type;
3508 if (input.is_pull_terminal) {
3514 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3515 auto move_callback = [
this](
const keyT &key, valueT &&value) {
3516 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3518 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3519 if constexpr (std::is_copy_constructible_v<valueT>) {
3520 set_arg<i, keyT, const valueT &>(key, value);
3523 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() +
" which is not copy constructible, std::move datum into send statement");
3526 auto broadcast_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3527 if constexpr (std::is_copy_constructible_v<valueT>) {
3528 broadcast_arg<i, keyT, valueT>(keylist, value);
3531 throw std::logic_error(std::string(
"TTG::PaRSEC: broadcast_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() +
" which is not copy constructible, broadcast is not possible with move-only type");
3534 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3535 prepare_send<i, keyT, valueT>(keylist, value);
3537 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3538 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3539 input.set_callback(send_callback, move_callback, broadcast_callback,
3540 setsize_callback, finalize_callback, prepare_send_callback);
3545 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3546 auto send_callback = [
this](
const keyT &key) { set_arg<i, keyT, ttg::Void>(key,
ttg::Void{}); };
3547 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3548 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3549 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3558 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3559 auto move_callback = [
this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3560 auto send_callback = [
this](
const valueT &value) {
3561 if constexpr (std::is_copy_constructible_v<valueT>) {
3562 set_arg<i, keyT, const valueT &>(value);
3565 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3568 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3569 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3570 auto prepare_send_callback = [
this](
const valueT &value) {
3571 prepare_send<i, void>(value);
3573 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3578 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3579 auto send_callback = [
this]() { set_arg<i, keyT, ttg::Void>(
ttg::Void{}); };
3580 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3581 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3582 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3592 template <std::size_t... IS>
3593 void register_input_callbacks(std::index_sequence<IS...>) {
3596 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3601 template <std::size_t... IS,
typename inedgesT>
3602 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3603 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3607 template <std::size_t... IS,
typename outedgesT>
3608 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3609 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3614 template <
typename input_terminals_tupleT, std::size_t... IS,
typename flowsT>
3615 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3617 (*(
const_cast<std::remove_const_t<decltype(flows[IS]-
>flow_flags)> *>(&(flows[IS]->flow_flags))) =
3618 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3619 : PARSEC_FLOW_ACCESS_RW),
3624 template <
typename input_terminals_tupleT,
typename flowsT>
3625 void initialize_flows(flowsT &&flows) {
3626 _initialize_flows<input_terminals_tupleT>(
3633 static int key_equal(parsec_key_t a, parsec_key_t b,
void *user_data) {
3634 if constexpr (std::is_same_v<keyT, void>) {
3637 keyT &ka = *(
reinterpret_cast<keyT *
>(a));
3638 keyT &kb = *(
reinterpret_cast<keyT *
>(b));
3643 static uint64_t key_hash(parsec_key_t k,
void *user_data) {
3644 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3645 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3648 keyT &kk = *(
reinterpret_cast<keyT *
>(k));
3650 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3655 static char *key_print(
char *buffer,
size_t buffer_size, parsec_key_t k,
void *user_data) {
3656 if constexpr (std::is_same_v<keyT, void>) {
3660 keyT kk = *(
reinterpret_cast<keyT *
>(k));
3661 std::stringstream iss;
3663 memset(buffer, 0, buffer_size);
3664 iss.get(buffer, buffer_size);
3669 static parsec_key_t make_key(
const parsec_taskpool_t *tp,
const parsec_assignment_t *as) {
3671 keyT *key = *(keyT**)&(as[2]);
3672 return reinterpret_cast<parsec_key_t
>(key);
3675 static char *parsec_ttg_task_snprintf(
char *buffer,
size_t buffer_size,
const parsec_task_t *parsec_task) {
3676 if(buffer_size == 0)
3679 if constexpr (ttg::meta::is_void_v<keyT>) {
3680 snprintf(buffer, buffer_size,
"%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3682 const task_t *task =
reinterpret_cast<const task_t*
>(parsec_task);
3683 std::stringstream ss;
3686 std::string keystr = ss.str();
3687 std::replace(keystr.begin(), keystr.end(),
'(',
':');
3688 std::replace(keystr.begin(), keystr.end(),
')',
':');
3690 snprintf(buffer, buffer_size,
"%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3695 #if defined(PARSEC_PROF_TRACE)
3696 static void *parsec_ttg_task_info(
void *dst,
const void *
data,
size_t size)
3698 const task_t *task =
reinterpret_cast<const task_t *
>(
data);
3700 if constexpr (ttg::meta::is_void_v<keyT>) {
3701 snprintf(
reinterpret_cast<char*
>(dst),
size,
"()");
3703 std::stringstream ss;
3705 snprintf(
reinterpret_cast<char*
>(dst),
size,
"%s", ss.str().c_str());
3711 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3713 template<std::
size_t I>
3714 inline static void increment_data_version_impl(task_t *task) {
3715 if constexpr (!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3716 if (task->copies[I] !=
nullptr){
3717 task->copies[I]->inc_current_version();
3722 template<std::size_t... Is>
3723 inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3725 int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3729 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3733 task_t *task = (task_t*)parsec_task;
3735 #ifdef TTG_HAVE_COROUTINE
3737 if (task->suspended_task_address) {
3739 #ifdef TTG_HAVE_DEVICE
3746 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3749 auto dev_data = dev_task.promise();
3752 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT);
3755 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3758 dev_data.do_sends();
3764 task->suspended_task_address =
nullptr;
3769 for (
int i = 0; i < task->data_count; i++) {
3770 detail::ttg_data_copy_t *copy = task->
copies[i];
3771 if (
nullptr == copy)
continue;
3773 task->copies[i] =
nullptr;
3775 return PARSEC_HOOK_RETURN_DONE;
3779 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3780 typename priomapT = ttg::detail::default_priomap<keyT>>
3781 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3782 ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3783 :
ttg::
TTBase(name, numinedges, numouts)
3786 , keymap(std::is_same<keymapT,
ttg::detail::default_keymap<keyT>>::value
3787 ? decltype(keymap)(
ttg::detail::default_keymap<keyT>(world))
3788 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3789 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3791 if (innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3792 if (outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3794 auto &world_impl = world.
impl();
3795 world_impl.register_op(
this);
3797 if constexpr (numinedges == numins) {
3805 register_input_callbacks(std::make_index_sequence<numinedges>{});
3808 memset(&
self, 0,
sizeof(parsec_task_class_t));
3810 self.name = strdup(
get_name().c_str());
3812 self.nb_parameters = 0;
3815 self.nb_flows = MAX_PARAM_COUNT;
3818 if( world_impl.profiling() ) {
3820 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3822 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3833 self.make_key = make_key;
3834 self.key_functions = &tasks_hash_fcts;
3835 self.task_snprintf = parsec_ttg_task_snprintf;
3837 #if defined(PARSEC_PROF_TRACE)
3838 self.profile_info = &parsec_ttg_task_info;
3841 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
3846 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3847 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
3848 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3849 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3850 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3851 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3852 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3854 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3855 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_HIP;
3856 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3857 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_hip<TT>;
3859 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3860 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3861 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3862 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3864 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3865 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3866 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3867 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3869 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3870 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3871 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3874 self.incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
3875 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CPU;
3876 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
3877 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook<TT>;
3878 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3879 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3880 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3884 self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3885 self.complete_execution = complete_task_and_release;
3887 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3888 parsec_flow_t *flow =
new parsec_flow_t;
3889 flow->name = strdup((std::string(
"flow in") + std::to_string(i)).c_str());
3890 flow->sym_type = PARSEC_SYM_INOUT;
3893 flow->dep_in[0] = NULL;
3894 flow->dep_out[0] = NULL;
3895 flow->flow_index = i;
3896 flow->flow_datatype_mask = ~0;
3897 *((parsec_flow_t **)&(
self.
in[i])) = flow;
3902 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3903 parsec_flow_t *flow =
new parsec_flow_t;
3904 flow->name = strdup((std::string(
"flow out") + std::to_string(i)).c_str());
3905 flow->sym_type = PARSEC_SYM_INOUT;
3906 flow->flow_flags = PARSEC_FLOW_ACCESS_READ;
3907 flow->dep_in[0] = NULL;
3908 flow->dep_out[0] = NULL;
3909 flow->flow_index = i;
3910 flow->flow_datatype_mask = (1 << i);
3911 *((parsec_flow_t **)&(
self.
out[i])) = flow;
3916 self.dependencies_goal = numins;
3919 auto *context = world_impl.context();
3920 for (
int i = 0; i < context->nb_vp; i++) {
3921 nbthreads += context->virtual_processes[i]->nb_cores;
3924 parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t),
sizeof(
task_t),
3925 offsetof(parsec_task_t, mempool_owner), nbthreads);
3931 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3932 typename priomapT = ttg::detail::default_priomap<keyT>>
3933 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3936 std::forward<priomapT>(priomap)) {}
3938 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3939 typename priomapT = ttg::detail::default_priomap<keyT>>
3941 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
ttg::World world,
3942 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
3943 :
TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
3944 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
3945 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
3947 if constexpr (numinedges > 0) {
3948 register_input_callbacks(std::make_index_sequence<numinedges>{});
3951 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3952 typename priomapT = ttg::detail::default_priomap<keyT>>
3954 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3957 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
3961 if(
nullptr !=
self.name ) {
3962 free((
void*)
self.name);
3963 self.name =
nullptr;
3966 for (std::size_t i = 0; i < numins; ++i) {
3967 if (inpute_reducers_taskclass[i] !=
nullptr) {
3968 std::free(inpute_reducers_taskclass[i]);
3969 inpute_reducers_taskclass[i] =
nullptr;
3977 ttT *op = (
ttT *)cb_data;
3978 if constexpr (!ttg::meta::is_void_v<keyT>) {
3979 std::cout <<
"Left over task " << op->
get_name() <<
" " << task->
key << std::endl;
3981 std::cout <<
"Left over task " << op->
get_name() << std::endl;
3999 parsec_mempool_destruct(&mempools);
4002 free((__parsec_chore_t *)
self.incarnations);
4003 for (
int i = 0; i < MAX_PARAM_COUNT; i++) {
4004 if (NULL !=
self.
in[i]) {
4005 free(
self.
in[i]->name);
4007 self.in[i] =
nullptr;
4009 if (NULL !=
self.
out[i]) {
4010 free(
self.
out[i]->name);
4012 self.out[i] =
nullptr;
4015 world.
impl().deregister_op(
this);
4025 template <std::
size_t i,
typename Reducer>
4028 std::get<i>(input_reducers) = reducer;
4030 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4031 if (
nullptr == tc) {
4032 tc = (parsec_task_class_t *)std::calloc(1,
sizeof(*tc));
4033 inpute_reducers_taskclass[i] = tc;
4035 tc->name = strdup((
get_name() + std::string(
" reducer ") + std::to_string(i)).c_str());
4037 tc->nb_parameters = 0;
4039 tc->nb_flows = numflows;
4041 auto &world_impl = world.
impl();
4043 if( world_impl.profiling() ) {
4045 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4047 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4058 tc->make_key = make_key;
4059 tc->key_functions = &tasks_hash_fcts;
4060 tc->task_snprintf = parsec_ttg_task_snprintf;
4062 #if defined(PARSEC_PROF_TRACE)
4063 tc->profile_info = &parsec_ttg_task_info;
4066 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
4071 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
4072 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
4073 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
4075 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_CPU;
4076 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
4077 ((__parsec_chore_t *)
self.incarnations)[1].hook =
detail::hook;
4078 ((__parsec_chore_t *)
self.incarnations)[2].type = PARSEC_DEV_NONE;
4079 ((__parsec_chore_t *)
self.incarnations)[2].evaluate = NULL;
4080 ((__parsec_chore_t *)
self.incarnations)[2].hook = NULL;
4084 tc->incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
4085 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4086 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4087 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4088 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4089 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4090 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4094 tc->release_task = &parsec_release_task_to_mempool;
4095 tc->complete_execution = NULL;
4106 template <std::
size_t i,
typename Reducer>
4108 set_input_reducer<i>(std::forward<Reducer>(reducer));
4109 set_static_argstream_size<i>(
size);
4114 template <std::
size_t i>
4115 std::tuple_element_t<i, input_terminals_type> *
in() {
4116 return &std::get<i>(input_terminals);
4121 template <std::
size_t i>
4122 std::tuple_element_t<i, output_terminalsT> *
out() {
4123 return &std::get<i>(output_terminals);
4127 template <
typename Key = keyT>
4128 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4131 if constexpr(!std::is_same_v<Key, key_type>) {
4136 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4138 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4139 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4144 template <
typename Key = keyT>
4145 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4149 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4151 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4152 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4156 template <
typename Key = keyT>
4157 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4161 if constexpr(!std::is_same_v<Key, key_type>) {
4166 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4167 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4172 template <
typename Key = keyT>
4173 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke() {
4176 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4177 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4182 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4189 template<
typename Key,
typename Arg,
typename... Args, std::size_t I, std::size_t... Is>
4190 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key, Arg&& arg, Args&&... args) {
4191 using arg_type = std::decay_t<Arg>;
4192 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4197 copy->reset_readers();
4199 set_arg_impl<I>(key, val, copy);
4201 if constexpr (std::is_rvalue_reference_v<Arg>) {
4205 }
else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4206 set_arg<I>(key, std::forward<Arg>(arg));
4208 if constexpr (
sizeof...(Is) > 0) {
4210 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4216 template <
typename Key = keyT,
typename Arg,
typename... Args>
4217 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4218 const Key &key, Arg&& arg, Args&&... args) {
4219 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4220 "Number of arguments to invoke must match the number of task inputs.");
4223 invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4224 std::forward<Arg>(arg), std::forward<Args>(args)...);
4227 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4228 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4232 m_defer_writer = value;
4236 return m_defer_writer;
4241 world.
impl().register_tt_profiling(
this);
4251 template <
typename Keymap>
4262 template <
typename Priomap>
4264 priomap = std::forward<Priomap>(pm);
4272 template<
typename Devicemap>
4277 devicemap = std::forward<Devicemap>(dm);
4280 devicemap = [=](
const keyT& key) {
4288 throw std::runtime_error(
"Unknown device type!");
4301 MPI_Comm_rank(MPI_COMM_WORLD, &
rank);
4304 auto &world_impl = world.
impl();
4308 auto tp = world_impl.taskpool();
4314 std::vector<static_set_arg_fct_arg_t> tmp;
4315 for (
auto it = se.first; it != se.second;) {
4317 tmp.push_back(it->second);
4322 for (
auto it : tmp) {
4325 std::get<1>(it),
", ", std::get<2>(it),
")");
4326 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4327 std::get<0>(it), NULL);
4329 free(std::get<1>(it));
4352 bool do_release =
true;
4358 : copy_to_remove(h.copy_to_remove)
4360 h.copy_to_remove =
nullptr;
4366 std::swap(copy_to_remove, h.copy_to_remove);
4371 if (
nullptr != copy_to_remove) {
4379 template <
typename Value>
4380 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4381 constexpr
auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4382 static_assert(value_is_rvref ||
4383 std::is_copy_constructible_v<std::decay_t<Value>>,
4384 "Data sent without being moved must be copy-constructible!");
4387 if (
nullptr == caller) {
4388 ttg::print(
"ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4390 using value_type = std::remove_reference_t<Value>;
4393 value_type *value_ptr = &value;
4394 if (
nullptr == copy) {
4402 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4403 copy_to_remove = copy;
4405 if constexpr (value_is_rvref) {
4413 if constexpr (value_is_rvref)
4414 return std::move(*value_ptr);
4419 template<
typename Value>
4422 if (
nullptr == caller) {
4423 ttg::print(
"ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4427 if (
nullptr == copy) {
4432 copy_to_remove = copy;
4439 template <
typename Value>
4441 static_assert(std::is_copy_constructible_v<std::decay_t<Value>>,
4442 "Data sent without being moved must be copy-constructible!");
4444 if (
nullptr == caller) {
4445 ttg::print(
"ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4449 const Value *value_ptr = &value;
4450 if (
nullptr == copy) {
4458 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4459 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
const std::string & get_name() const
Gets the name of this operation.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
void finalize_argstream_from_msg(void *data, std::size_t size)
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
std::tuple_element_t< i, input_terminals_type > * in()
static constexpr bool derived_has_hip_op()
void set_keymap(Keymap &&km)
keymap setter
decltype(keymap) const & get_keymap() const
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
void print_incomplete_tasks()
void set_arg_from_msg(void *data, std::size_t size)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
static constexpr bool derived_has_level_zero_op()
parsec_thread_mempool_t * get_task_mempool(void)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
task_t * create_new_task(const Key &key)
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
bool get_defer_writer(bool value)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
static void ht_iter_cb(void *item, void *cb_data)
const auto & get_output_terminals() const
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_input_reducer(Reducer &&reducer, std::size_t size)
output_terminalsT output_terminals_type
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
void set_input_reducer(Reducer &&reducer)
decltype(priomap) const & get_priomap() const
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
void get_from_pull_msg(void *data, std::size_t size)
static constexpr int numinvals
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
ttg::World get_world() const override final
void set_defer_writer(bool value)
void make_executable() override
Marks this executable.
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
virtual void release() override
void set_devicemap(Devicemap &&dm)
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
void register_static_op_function(void)
static resultT get(InTuple &&intuple)
static auto & get(InTuple &&intuple)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
actual_input_tuple_type input_args_type
void set_priomap(Priomap &&pm)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
std::tuple_element_t< i, output_terminalsT > * out()
void argstream_set_size_from_msg(void *data, std::size_t size)
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
void set_static_argstream_size(std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
static constexpr bool derived_has_device_op()
static constexpr const ttg::Runtime runtime
static constexpr bool derived_has_cuda_op()
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs > *t)
virtual void profile_off() override
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
auto * execution_stream()
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
bool complete_transfer(void)
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
std::size_t max_inline_size
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
const parsec_symbol_t parsec_taskclass_param1
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
const parsec_symbol_t parsec_taskclass_param2
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * create_new_datacopy(Value &&value)
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
void release_data_copy(ttg_data_copy_t *copy)
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
const parsec_symbol_t parsec_taskclass_param3
const parsec_symbol_t parsec_taskclass_param0
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
int rank(World world=default_execution_context())
ttg::World & get_default_world()
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_DATA_FLAG_REGISTERED
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(value_copy_handler &&h)
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
value_copy_handler & operator=(const value_copy_handler &h)=delete
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
ttg_parsec_data_flags data_flags
parsec_hash_table_item_t tt_ht_item
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
static void release_task(parsec_ttg_task_base_t *task_base)
static constexpr int mutable_tag
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
virtual void * get_ptr()=0
void transfer_ownership(int access, int device=0)
void inc_current_version()
void foreach_parsec_data(Fn &&fn)
iovec_iterator iovec_begin()
void set_next_task(parsec_task_t *task)
iovec_iterator iovec_end()
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER