2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
13 #define TTG_PARSEC_DEFER_WRITER false
15 #include "ttg/config.h"
20 #include "../../ttg.h"
40 #ifdef TTG_HAVE_DEVICE
58 #include <experimental/type_traits>
73 #if defined(TTG_HAVE_MPI)
75 #if defined(TTG_HAVE_MPIEXT)
82 #include <parsec/class/parsec_hash_table.h>
83 #include <parsec/data_internal.h>
84 #include <parsec/execution_stream.h>
85 #include <parsec/interfaces/interface.h>
86 #include <parsec/mca/device/device.h>
87 #include <parsec/parsec_comm_engine.h>
88 #include <parsec/parsec_internal.h>
89 #include <parsec/scheduling.h>
90 #include <parsec/remote_dep.h>
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
125 #include <boost/type_index.hpp>
127 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
151 uint64_t
op_id = std::numeric_limits<uint64_t>::max();
172 static void unregister_parsec_tags(
void *_);
185 uint32_t taskpool_id,
190 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
196 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *
data,
long unsigned int size,
197 int src_rank,
void *obj) {
199 parsec_taskpool_t *tp = NULL;
201 uint64_t op_id = msg->
op_id;
208 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
209 static_set_arg_fct = op_pair.first;
210 static_set_arg_fct(
data,
size, op_pair.second);
211 tp->tdm.module->incoming_message_end(tp, NULL);
213 }
catch (
const std::out_of_range &e) {
214 void *data_cpy = malloc(
size);
215 assert(data_cpy != 0);
218 ", ", op_id,
", ", data_cpy,
", ",
size,
")");
225 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
226 int src,
void *cb_data);
229 static bool im =
false;
240 bool _task_profiling;
242 mpi_space_support = {
true,
false,
false};
244 int query_comm_size() {
246 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
250 int query_comm_rank() {
252 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
256 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
262 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
269 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
270 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
271 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
272 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
275 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
278 , own_ctx(c == nullptr)
279 #if defined(PARSEC_PROF_TRACE)
280 , profiling_array(nullptr)
281 , profiling_array_size(0)
283 , _dag_profiling(false)
284 , _task_profiling(false)
287 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
291 #
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
292 || MPIX_Query_cuda_support()
299 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
300 || MPIX_Query_hip_support()
306 #if defined(PARSEC_PROF_TRACE)
307 if(parsec_profile_enabled) {
309 #if defined(PARSEC_TTG_PROFILE_BACKEND)
310 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
311 (
int*)&parsec_ttg_profile_backend_set_arg_start,
312 (
int*)&parsec_ttg_profile_backend_set_arg_end);
313 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
314 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
315 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
316 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
317 sizeof(
size_t),
"size{int64_t}",
318 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
319 (
int*)&parsec_ttg_profile_backend_free_datacopy);
324 if( NULL != parsec_ce.tag_register) {
338 assert(
nullptr == tpool);
339 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
340 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
342 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
343 tpool->taskpool_name = strdup(
"TTG Taskpool");
344 parsec_taskpool_reserve_id(tpool);
346 tpool->devices_index_mask = 0;
347 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
348 parsec_device_module_t *device = parsec_mca_device_get(i);
349 if( NULL == device )
continue;
350 tpool->devices_index_mask |= (1 << device->device_index);
353 #ifdef TTG_USE_USER_TERMDET
354 parsec_termdet_open_module(tpool,
"user_trigger");
356 parsec_termdet_open_dyn_module(tpool);
364 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
367 #if defined(PARSEC_PROF_TRACE)
368 tpool->profiling_array = profiling_array;
377 parsec_taskpool_started =
false;
397 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
400 if (!parsec_taskpool_started) {
401 parsec_enqueue(ctx, tpool);
402 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
403 tpool->tdm.module->taskpool_ready(tpool);
404 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
406 parsec_taskpool_started =
true;
411 #if defined(PARSEC_PROF_TRACE)
415 tpool->profiling_array =
nullptr;
417 assert(NULL != tpool->tdm.monitor);
418 tpool->tdm.module->unmonitor_taskpool(tpool);
419 parsec_taskpool_free(tpool);
425 if (parsec_taskpool_started) {
427 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
428 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
430 parsec_context_wait(ctx);
432 parsec_taskpool_wait(tpool);
438 unregister_parsec_tags(
nullptr);
440 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
442 #if defined(PARSEC_PROF_TRACE)
443 if(
nullptr != profiling_array) {
444 free(profiling_array);
445 profiling_array =
nullptr;
446 profiling_array_size = 0;
449 if (own_ctx) parsec_fini(&ctx);
465 virtual void dag_on(
const std::string &filename)
override {
466 #if defined(PARSEC_PROF_GRAPHER)
467 if(!_dag_profiling) {
469 size_t len = strlen(filename.c_str())+32;
470 char ext_filename[len];
471 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
472 parsec_prof_grapher_init(ctx, ext_filename);
473 _dag_profiling =
true;
476 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
477 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
482 #if defined(PARSEC_PROF_GRAPHER)
484 parsec_prof_grapher_fini();
485 _dag_profiling =
false;
491 #if defined(PARSEC_PROF_TRACE)
492 _task_profiling =
false;
497 #if defined(PARSEC_PROF_TRACE)
498 _task_profiling =
true;
502 virtual bool profiling()
override {
return _task_profiling; }
505 return mpi_space_support[
static_cast<std::size_t
>(space)];
509 #ifdef TTG_USE_USER_TERMDET
510 if(parsec_taskpool_started) {
512 parsec_taskpool_started =
false;
517 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs = ttg::typelist<>>
519 #if defined(PARSEC_PROF_TRACE)
520 std::stringstream ss;
521 build_composite_name_rec(t->
ttg_ptr(), ss);
528 #if defined(PARSEC_PROF_TRACE)
529 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
532 build_composite_name_rec(t->
ttg_ptr(), ss);
536 void register_new_profiling_event(
const char *name,
int position) {
537 if(2*position >= profiling_array_size) {
538 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
539 profiling_array = (
int*)realloc((
void*)profiling_array,
540 new_profiling_array_size *
sizeof(int));
541 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
542 profiling_array_size = new_profiling_array_size;
543 tpool->profiling_array = profiling_array;
546 assert(0 == tpool->profiling_array[2*position]);
547 assert(0 == tpool->profiling_array[2*position+1]);
551 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
552 (
int*)&tpool->profiling_array[2*position],
553 (
int*)&tpool->profiling_array[2*position+1]);
559 if (!parsec_taskpool_started) {
560 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
564 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
566 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
568 parsec_taskpool_wait(tpool);
581 parsec_context_t *ctx =
nullptr;
582 bool own_ctx =
false;
583 parsec_taskpool_t *tpool =
nullptr;
584 bool parsec_taskpool_started =
false;
585 #if defined(PARSEC_PROF_TRACE)
586 int *profiling_array;
587 std::size_t profiling_array_size;
591 static void unregister_parsec_tags(
void *_pidx)
593 if(NULL != parsec_ce.tag_unregister) {
602 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
610 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
618 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
626 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
636 if (task ==
nullptr ||
ptr ==
nullptr) {
641 if (NULL != copy && copy->get_ptr() ==
ptr) {
651 if (task ==
nullptr ||
ptr ==
nullptr) {
656 if (NULL != copy && copy->get_ptr() ==
ptr) {
664 if (task ==
nullptr || copy ==
nullptr) {
668 if (MAX_PARAM_COUNT < task->data_count) {
669 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
681 if (copy == task->
copies[i]) {
691 task->
copies[i] =
nullptr;
695 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
696 #warning "ttg::PaRSEC enables data copy tracking"
697 static std::unordered_set<ttg_data_copy_t *> pending_copies;
698 static std::mutex pending_copies_mutex;
700 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
701 static int64_t parsec_ttg_data_copy_uid = 0;
704 template <
typename Value>
706 using value_type = std::decay_t<Value>;
709 std::is_constructible_v<value_type, decltype(value)>) {
710 copy =
new value_type(std::forward<Value>(value));
715 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
717 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
720 copy->size =
sizeof(Value);
721 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
723 static_cast<uint64_t
>(copy->uid),
724 PROFILE_OBJECT_ID_NULL, ©->size,
725 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
728 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
730 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
731 auto rc = pending_copies.insert(copy);
732 assert(std::get<1>(rc));
739 template <std::size_t... IS,
typename Key = keyT>
741 int junk[] = {0, (invoke_pull_terminal<IS>(
742 std::get<IS>(input_terminals), key, task),
748 template<
typename TT, std::
size_t I>
750 if constexpr(!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
756 template<
typename TT, std::size_t... Is>
759 int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->
copies[Is], device), 0)...};
763 template<
typename TT>
764 inline parsec_hook_return_t
hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
766 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
769 return me->template invoke_op<ttg::ExecutionSpace::Host>();
772 template<
typename TT>
773 inline parsec_hook_return_t
hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
776 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
778 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
779 return PARSEC_HOOK_RETURN_ERROR;
783 template<
typename TT>
784 inline parsec_hook_return_t
hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
787 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
789 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
790 return PARSEC_HOOK_RETURN_ERROR;
794 template<
typename TT>
795 inline parsec_hook_return_t
hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
798 return me->template invoke_op<ttg::ExecutionSpace::L0>();
800 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
801 return PARSEC_HOOK_RETURN_ERROR;
806 template<
typename TT>
807 inline parsec_hook_return_t
evaluate_cuda(
const parsec_task_t *parsec_task) {
810 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
812 return PARSEC_HOOK_RETURN_NEXT;
816 template<
typename TT>
817 inline parsec_hook_return_t
evaluate_hip(
const parsec_task_t *parsec_task) {
820 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
822 return PARSEC_HOOK_RETURN_NEXT;
826 template<
typename TT>
830 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
832 return PARSEC_HOOK_RETURN_NEXT;
837 template <
typename KeyT,
typename ActivationCallbackT>
839 std::vector<KeyT> _keylist;
840 std::atomic<int> _outstanding_transfers;
841 ActivationCallbackT _cb;
846 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
849 int left = --_outstanding_transfers;
851 _cb(std::move(_keylist), _copy);
858 template <
typename ActivationT>
859 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
860 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
862 parsec_ce.mem_unregister(&lreg);
863 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
864 if (activation->complete_transfer()) {
867 return PARSEC_SUCCESS;
870 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
871 int src,
void *cb_data) {
872 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
873 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
876 return PARSEC_SUCCESS;
879 template <
typename FuncT>
880 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
881 int src,
void *cb_data) {
882 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
883 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
886 return PARSEC_SUCCESS;
900 }
else if (readers == 1) {
906 if (1 == readers || readers == copy->
mutable_tag) {
907 std::atomic_thread_fence(std::memory_order_acquire);
920 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
922 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
923 size_t rc = pending_copies.erase(copy);
927 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
931 static_cast<uint64_t
>(copy->uid),
932 PROFILE_OBJECT_ID_NULL, ©->size,
933 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
941 template <
typename Value>
944 bool replace =
false;
946 assert(readers != 0);
950 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->
defer_writer;
976 }
else if (!readonly) {
990 if (1 == copy_in->
num_readers() && !defer_writer) {
997 std::atomic_thread_fence(std::memory_order_release);
1011 if (NULL == copy_res) {
1013 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1020 for (
int i = 0; i < deferred_op->
data_count; ++i) {
1021 if (deferred_op->
copies[i] == copy_in) {
1022 deferred_op->
copies[i] = new_copy;
1035 copy_res = new_copy;
1039 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() +
" but the type is not copyable");
1048 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1051 int mpi_initialized;
1052 MPI_Initialized(&mpi_initialized);
1053 if (!mpi_initialized) {
1055 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1057 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1070 for (
int i = 0; i < parsec_nb_devices; ++i) {
1071 bool is_gpu = parsec_mca_device_is_gpu(i);
1075 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1080 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1081 if (
nullptr != ttg_max_inline_cstr) {
1082 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1088 bool all_peer_access =
true;
1090 for (
int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1091 parsec_device_module_t *device = parsec_mca_device_get(i);
1092 if (PARSEC_DEV_IS_GPU(device->type)) {
1093 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)device;
1094 for (
int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1095 if (PARSEC_DEV_IS_GPU(device->type)) {
1096 all_peer_access = all_peer_access && (gpu_device->peer_access_mask & (1<<j));
1110 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1119 template <
typename T>
1121 world.
impl().register_ptr(
ptr);
1124 template <
typename T>
1126 world.
impl().register_ptr(std::move(
ptr));
1130 world.
impl().register_status(status_ptr);
1133 template <
typename Callback>
1135 world.
impl().register_callback(std::forward<Callback>(callback));
1141 double result = 0.0;
1142 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1147 MPI_Barrier(world.
impl().comm());
1152 template <
typename T>
1155 if (world.
rank() == source_rank) {
1158 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1160 unsigned char *buf =
new unsigned char[BUFLEN];
1161 if (world.
rank() == source_rank) {
1164 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1165 if (world.
rank() != source_rank) {
1178 parsec_task_class_t
self;
1183 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs>
1187 static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1188 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1190 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1193 static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1194 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1195 static_assert((ttg::meta::none_has_reference_v<input_valueTs>),
"Input typelist cannot contain reference types");
1196 static_assert(ttg::meta::is_none_Void_v<input_valueTs>,
"ttg::Void is for internal use only, do not use it");
1198 parsec_mempool_t mempools;
1201 template <
typename T>
1202 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1204 template <
typename T>
1205 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1207 template <
typename T>
1208 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1212 static constexpr
int numinedges = std::tuple_size_v<input_tuple_type>;
1213 static constexpr
int numins = std::tuple_size_v<actual_input_tuple_type>;
1214 static constexpr
int numouts = std::tuple_size_v<output_terminalsT>;
1215 static constexpr
int numflows = std::max(numins, numouts);
1220 if constexpr (ttg::meta::is_detected_v<have_cuda_op_non_type_t, derivedT>) {
1221 return derivedT::have_cuda_op;
1229 if constexpr (ttg::meta::is_detected_v<have_hip_op_non_type_t, derivedT>) {
1230 return derivedT::have_hip_op;
1238 if constexpr (ttg::meta::is_detected_v<have_level_zero_op_non_type_t, derivedT>) {
1239 return derivedT::have_level_zero_op;
1257 ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1259 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1264 std::tuple_size_v<input_refs_tuple_type>;
1270 template <std::
size_t i,
typename resultT,
typename InTuple>
1271 static resultT
get(InTuple &&intuple) {
1272 return static_cast<resultT
>(std::get<i>(std::forward<InTuple>(intuple)));
1274 template <std::
size_t i,
typename InTuple>
1275 static auto &
get(InTuple &&intuple) {
1276 return std::get<i>(std::forward<InTuple>(intuple));
1286 constexpr
static const size_t task_key_offset =
sizeof(task_t);
1289 output_terminalsT output_terminals;
1295 template <std::size_t... IS>
1296 static constexpr
auto make_set_args_fcts(std::index_sequence<IS...>) {
1297 using resultT = decltype(set_arg_from_msg_fcts);
1298 return resultT{{&TT::set_arg_from_msg<IS>...}};
1300 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_arg_from_msg_fcts =
1301 make_set_args_fcts(std::make_index_sequence<numins>{});
1303 template <std::size_t... IS>
1304 static constexpr
auto make_set_size_fcts(std::index_sequence<IS...>) {
1305 using resultT = decltype(set_argstream_size_from_msg_fcts);
1306 return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1308 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1309 make_set_size_fcts(std::make_index_sequence<numins>{});
1311 template <std::size_t... IS>
1312 static constexpr
auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1313 using resultT = decltype(finalize_argstream_from_msg_fcts);
1314 return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1316 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1317 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1319 template <std::size_t... IS>
1320 static constexpr
auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1321 using resultT = decltype(get_from_pull_msg_fcts);
1322 return resultT{{&TT::get_from_pull_msg<IS>...}};
1324 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1325 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1327 template<std::size_t... IS>
1328 constexpr
static auto make_input_is_const(std::index_sequence<IS...>) {
1329 using resultT = decltype(input_is_const);
1330 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1332 constexpr
static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1335 ttg::meta::detail::keymap_t<keyT> keymap;
1336 ttg::meta::detail::keymap_t<keyT> priomap;
1337 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1339 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1341 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1342 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1343 int num_pullins = 0;
1347 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1348 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1358 auto op(Args &&...args) {
1359 derivedT *derived =
static_cast<derivedT *
>(
this);
1361 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1362 if constexpr (std::is_same_v<return_type,void>) {
1363 derived->op(std::forward<Args>(args)...);
1367 return derived->op(std::forward<Args>(args)...);
1371 template <std::
size_t i,
typename terminalT,
typename Key>
1372 void invoke_pull_terminal(terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1373 if (
in.is_pull_terminal) {
1374 auto owner =
in.container.owner(key);
1375 if (owner != world.rank()) {
1376 get_pull_terminal_data_from<i>(owner, key);
1379 set_arg<i>(key, (
in.container).get(key));
1384 template <std::
size_t i,
typename Key>
1385 void get_pull_terminal_data_from(
const int owner,
1387 using msg_t = detail::msg_t;
1388 auto &world_impl = world.impl();
1389 parsec_taskpool_t *tp = world_impl.taskpool();
1390 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), tp->taskpool_id,
1395 pos =
pack(key, msg->bytes, pos);
1396 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1397 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1398 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
1399 sizeof(msg_header_t) + pos);
1402 template <std::size_t... IS,
typename Key = keyT>
1403 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1404 int junk[] = {0, (invoke_pull_terminal<IS>(
1405 std::get<IS>(input_terminals), key, task),
1410 template <std::size_t... IS>
1411 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1413 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1414 task->copies[IS]->get_ptr()))...};
1417 #ifdef TTG_HAVE_DEVICE
1421 template <ttg::ExecutionSpace Space>
1422 static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1423 parsec_gpu_task_t *gpu_task,
1424 parsec_gpu_exec_stream_t *gpu_stream) {
1426 task_t *task = (task_t*)gpu_task->ec;
1428 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1430 task->dev_ptr->stream = gpu_stream;
1435 auto dev_data = dev_task.promise();
1438 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1439 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1441 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1443 parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1449 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1451 parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1457 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1459 parsec_level_zero_exec_stream_t *stream;
1460 stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1467 static_op<Space>(&task->parsec_task);
1472 int rc = PARSEC_HOOK_RETURN_DONE;
1473 if (
nullptr != task->suspended_task_address) {
1475 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1476 dev_data = dev_task.promise();
1478 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1479 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1480 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1482 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1483 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1488 rc = PARSEC_HOOK_RETURN_AGAIN;
1497 template <ttg::ExecutionSpace Space>
1498 static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1500 task_t *task = (task_t*)parsec_task;
1501 if (task->dev_ptr->gpu_task ==
nullptr) {
1504 parsec_gpu_task_t *gpu_task;
1506 gpu_task =
static_cast<parsec_gpu_task_t*
>(std::calloc(1,
sizeof(*gpu_task)));
1507 PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1508 gpu_task->ec = parsec_task;
1509 gpu_task->task_type = 0;
1510 gpu_task->last_data_check_epoch = 0;
1511 gpu_task->pushout = 0;
1512 gpu_task->submit = &TT::device_static_submit<Space>;
1521 task->dev_ptr->gpu_task = gpu_task;
1524 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1527 task->dev_ptr->task_class = *task->parsec_task.task_class;
1530 static_op<Space>(parsec_task);
1534 parsec_task_class_t& tc = task->dev_ptr->task_class;
1537 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1538 tc.in[i] = gpu_task->flow[i];
1539 tc.out[i] = gpu_task->flow[i];
1541 tc.nb_flows = MAX_PARAM_COUNT;
1545 if (tt->devicemap) {
1547 if constexpr (std::is_void_v<keyT>) {
1552 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1554 if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1555 parsec_data_t *
data = parsec_task->data[i].data_in->original;
1559 if (
data->owner_device == 0) {
1560 parsec_advise_data_on_device(
data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1567 task->parsec_task.task_class = &task->dev_ptr->task_class;
1570 return PARSEC_HOOK_RETURN_DONE;
1573 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1576 return PARSEC_HOOK_RETURN_ERROR;
1580 template <ttg::ExecutionSpace Space>
1581 static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1586 task_t *task = (task_t*)parsec_task;
1588 if (
nullptr == task->suspended_task_address) {
1590 return PARSEC_HOOK_RETURN_DONE;
1594 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1597 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1599 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1600 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1602 return PARSEC_HOOK_RETURN_DONE;
1605 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1606 assert(NULL != device);
1608 task->dev_ptr->device = device;
1609 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1610 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1612 switch(device->super.type) {
1614 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1615 case PARSEC_DEV_CUDA:
1619 gpu_task->stage_in = parsec_default_gpu_stage_in;
1620 gpu_task->stage_out = parsec_default_gpu_stage_out;
1621 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1625 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1626 case PARSEC_DEV_HIP:
1628 gpu_task->stage_in = parsec_default_gpu_stage_in;
1629 gpu_task->stage_out = parsec_default_gpu_stage_out;
1630 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1634 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1635 case PARSEC_DEV_LEVEL_ZERO:
1637 gpu_task->stage_in = parsec_default_gpu_stage_in;
1638 gpu_task->stage_out = parsec_default_gpu_stage_out;
1639 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1646 ttg::print_error(task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1648 return PARSEC_HOOK_RETURN_DONE;
1652 template <ttg::ExecutionSpace Space>
1653 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1655 task_t *task = (task_t*)parsec_task;
1656 void* suspended_task_address =
1657 #ifdef TTG_HAVE_COROUTINE
1658 task->suspended_task_address;
1663 if (suspended_task_address ==
nullptr) {
1665 ttT *baseobj = task->tt;
1666 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1669 if (obj->tracing()) {
1670 if constexpr (!ttg::meta::is_void_v<keyT>)
1671 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": executing");
1673 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : executing");
1676 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1677 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1678 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, std::move(input), obj->output_terminals));
1679 }
else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1680 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1681 }
else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1682 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1683 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(std::move(input), obj->output_terminals));
1684 }
else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1685 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1693 #ifdef TTG_HAVE_COROUTINE
1696 #ifdef TTG_HAVE_DEVICE
1698 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1702 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1703 task->tt->set_outputs_tls_ptr();
1705 if (coro.completed()) {
1707 suspended_task_address =
nullptr;
1709 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1715 assert(ret.ready());
1716 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1717 task->tt->set_outputs_tls_ptr();
1719 if (ret.completed()) {
1721 suspended_task_address =
nullptr;
1729 for (
auto &event_ptr : events) {
1730 event_ptr->finish();
1734 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1736 task->suspended_task_address = suspended_task_address;
1744 #ifdef TTG_HAVE_COROUTINE
1745 task->suspended_task_address = suspended_task_address;
1747 if (suspended_task_address ==
nullptr) {
1748 ttT *baseobj = task->tt;
1749 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1750 if (obj->tracing()) {
1751 if constexpr (!ttg::meta::is_void_v<keyT>)
1752 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": done executing");
1754 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1758 return PARSEC_HOOK_RETURN_DONE;
1761 template <ttg::ExecutionSpace Space>
1762 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1763 task_t *task =
static_cast<task_t*
>(parsec_task);
1765 void* suspended_task_address =
1766 #ifdef TTG_HAVE_COROUTINE
1767 task->suspended_task_address;
1771 if (suspended_task_address ==
nullptr) {
1772 ttT *baseobj = (
ttT *)task->object_ptr;
1773 derivedT *obj = (derivedT *)task->object_ptr;
1776 if constexpr (!ttg::meta::is_void_v<keyT>) {
1777 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1778 }
else if constexpr (ttg::meta::is_void_v<keyT>) {
1779 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1785 #ifdef TTG_HAVE_COROUTINE
1787 assert(ret.ready());
1789 if (ret.completed()) {
1791 suspended_task_address =
nullptr;
1800 task->suspended_task_address = suspended_task_address;
1802 if (suspended_task_address) {
1805 return PARSEC_HOOK_RETURN_AGAIN;
1808 return PARSEC_HOOK_RETURN_DONE;
1811 template <std::
size_t i>
1812 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1813 using rtask_t = detail::reducer_task_t;
1814 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1815 constexpr
const bool val_is_void = ttg::meta::is_void_v<value_t>;
1816 constexpr
const bool input_is_const = std::is_const_v<value_t>;
1817 rtask_t *rtask = (rtask_t*)parsec_task;
1818 task_t *parent_task =
static_cast<task_t*
>(rtask->parent_task);
1819 ttT *baseobj = parent_task->tt;
1820 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1822 auto& reducer = std::get<i>(baseobj->input_reducers);
1826 if (obj->tracing()) {
1827 if constexpr (!ttg::meta::is_void_v<keyT>)
1828 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1830 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : reducer executing");
1834 detail::ttg_data_copy_t *target_copy;
1835 target_copy = parent_task->copies[i];
1836 assert(val_is_void ||
nullptr != target_copy);
1839 std::size_t
size = 0;
1840 assert(parent_task->streams[i].reduce_count > 0);
1841 if (rtask->is_first) {
1842 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1844 if (obj->tracing()) {
1845 if constexpr (!ttg::meta::is_void_v<keyT>)
1846 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1848 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : first reducer empty");
1851 return PARSEC_HOOK_RETURN_DONE;
1859 if constexpr(!val_is_void) {
1861 detail::ttg_data_copy_t *source_copy;
1862 parsec_list_item_t *item;
1863 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1864 if (
nullptr == item) {
1868 source_copy = ((detail::ttg_data_copy_self_t *)(item))->
self;
1869 assert(target_copy->num_readers() == target_copy->mutable_tag);
1870 assert(source_copy->num_readers() > 0);
1871 reducer(*
reinterpret_cast<std::decay_t<value_t> *
>(target_copy->get_ptr()),
1872 *
reinterpret_cast<std::decay_t<value_t> *
>(source_copy->get_ptr()));
1874 }
else if constexpr(val_is_void) {
1878 size = ++parent_task->streams[i].size;
1880 }
while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1884 bool complete = (
size >= parent_task->streams[i].goal);
1889 if (complete && c == 0) {
1890 if constexpr(input_is_const) {
1892 target_copy->reset_readers();
1895 parent_task->remove_from_hash =
true;
1896 parent_task->release_task(parent_task);
1901 if (obj->tracing()) {
1902 if constexpr (!ttg::meta::is_void_v<keyT>)
1903 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": done executing");
1905 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1908 return PARSEC_HOOK_RETURN_DONE;
1913 template <
typename T>
1914 uint64_t
unpack(T &obj,
void *_bytes, uint64_t pos) {
1916 uint64_t payload_size;
1917 if constexpr (!dd_t::serialize_size_is_const) {
1920 payload_size = dd_t::payload_size(&obj);
1922 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1926 template <
typename T>
1929 uint64_t payload_size = dd_t::payload_size(&obj);
1932 copy->iovec_reset();
1935 if constexpr (!dd_t::serialize_size_is_const) {
1938 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1944 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1946 derivedT *obj =
reinterpret_cast<derivedT *
>(bop);
1947 switch (hd->
fn_id) {
1951 assert(hd->
param_id < obj->set_arg_from_msg_fcts.size());
1952 auto member = obj->set_arg_from_msg_fcts[hd->
param_id];
1962 assert(hd->
param_id < obj->set_argstream_size_from_msg_fcts.size());
1963 auto member = obj->set_argstream_size_from_msg_fcts[hd->
param_id];
1969 assert(hd->
param_id < obj->finalize_argstream_from_msg_fcts.size());
1970 auto member = obj->finalize_argstream_from_msg_fcts[hd->
param_id];
1976 assert(hd->
param_id < obj->get_from_pull_msg_fcts.size());
1977 auto member = obj->get_from_pull_msg_fcts[hd->
param_id];
1988 auto &world_impl = world.impl();
1989 parsec_execution_stream_s *es = world_impl.execution_stream();
1990 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1991 return &mempools.thread_mempools[index];
1994 template <
size_t i,
typename valueT>
1998 parsec_execution_stream_s *es = world.impl().execution_stream();
2000 dummy =
new (parsec_thread_mempool_allocate(mempool))
task_t(mempool, &this->
self,
this);
2008 dummy->
parsec_task.taskpool = world.impl().taskpool();
2015 parsec_task_t *task_ring =
nullptr;
2016 for (
auto &&key : keylist) {
2018 if constexpr (std::is_copy_constructible_v<valueT>) {
2019 set_arg_local_impl<i>(key, *
reinterpret_cast<valueT *
>(copy->
get_ptr()), copy, &task_ring);
2023 static_assert(!std::is_reference_v<valueT>);
2025 set_arg_local_impl<i>(key, std::move(*
reinterpret_cast<valueT *
>(copy->
get_ptr())), copy, &task_ring);
2027 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() +
" but the type is not copyable");
2032 if (
nullptr != task_ring) {
2033 auto &world_impl = world.impl();
2034 parsec_task_t *vp_task_ring[1] = { task_ring };
2035 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2042 complete_task_and_release(es, &dummy->
parsec_task);
2043 parsec_thread_mempool_free(mempool, &dummy->
parsec_task);
2055 template <std::
size_t i>
2057 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2059 msg_t *msg =
static_cast<msg_t *
>(
data);
2060 if constexpr (!ttg::meta::is_void_v<keyT>) {
2064 uint64_t key_end_pos;
2065 std::vector<keyT> keylist;
2066 int num_keys = msg->tt_id.num_keys;
2067 keylist.reserve(num_keys);
2068 auto rank = world.rank();
2069 for (
int k = 0; k < num_keys; ++k) {
2071 pos =
unpack(key, msg->bytes, pos);
2072 assert(keymap(key) ==
rank);
2073 keylist.push_back(std::move(key));
2079 if constexpr (!ttg::meta::is_void_v<valueT>) {
2080 using decvalueT = std::decay_t<valueT>;
2081 int32_t num_iovecs = msg->tt_id.num_iovecs;
2086 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2089 metadata_t metadata;
2090 pos =
unpack(metadata, msg->bytes, pos);
2099 parsec_gpu_data_copy_t* gpu_elem;
2100 gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2103 while (i < parsec_nb_devices) {
2104 if (
nullptr == gpu_elem) {
2105 gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2106 gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2107 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2108 gpu_elem->version = 0;
2109 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2111 if (
nullptr == gpu_elem->device_private) {
2112 gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2113 if (
nullptr == gpu_elem->device_private) {
2122 pos =
unpack(*
static_cast<decvalueT *
>(copy->
get_ptr()), msg->bytes, pos);
2127 if (num_iovecs == 0) {
2128 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2133 int remote = msg->tt_id.sender;
2134 assert(remote < world.size());
2136 auto &val = *
static_cast<decvalueT *
>(copy->
get_ptr());
2138 bool inline_data = msg->tt_id.inline_data;
2142 auto handle_iovecs_fn =
2143 [&](
auto&& iovecs) {
2147 for (
auto &&iov : iovecs) {
2149 std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2150 pos += iov.num_bytes;
2154 parsec_ce_tag_t cbtag;
2155 std::memcpy(&cbtag, msg->bytes + pos,
sizeof(cbtag));
2156 pos +=
sizeof(cbtag);
2161 set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2162 this->world.impl().decrement_inflight_msg();
2165 using ActivationT = std::decay_t<decltype(*activation)>;
2167 for (
auto &&iov : iovecs) {
2169 parsec_ce_mem_reg_handle_t rreg;
2170 int32_t rreg_size_i;
2171 std::memcpy(&rreg_size_i, msg->bytes + pos,
sizeof(rreg_size_i));
2172 pos +=
sizeof(rreg_size_i);
2173 rreg =
static_cast<parsec_ce_mem_reg_handle_t
>(msg->bytes + pos);
2177 std::intptr_t fn_ptr;
2178 std::memcpy(&fn_ptr, msg->bytes + pos,
sizeof(fn_ptr));
2179 pos +=
sizeof(fn_ptr);
2182 parsec_ce_mem_reg_handle_t lreg;
2184 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2185 iov.num_bytes, &lreg, &lreg_size);
2186 world.impl().increment_inflight_msg();
2189 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2190 &detail::get_complete_cb<ActivationT>, activation,
2192 cbtag, &fn_ptr,
sizeof(std::intptr_t));
2198 handle_iovecs_fn(descr.get_data(val));
2204 assert(num_iovecs == nv);
2208 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2212 }
else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2213 for (
auto &&key : keylist) {
2214 set_arg<i, keyT, ttg::Void>(key,
ttg::Void{});
2218 }
else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2219 using decvalueT = std::decay_t<valueT>;
2222 unpack(val, msg->bytes, 0);
2223 set_arg<i, keyT, valueT>(std::move(val));
2225 }
else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2226 set_arg<i, keyT, ttg::Void>(
ttg::Void{});
2232 template <std::
size_t i>
2235 msg_t *msg =
static_cast<msg_t *
>(
data);
2236 if constexpr (!ttg::meta::is_void_v<keyT>) {
2239 auto rank = world.rank();
2241 pos =
unpack(key, msg->bytes, pos);
2242 assert(keymap(key) ==
rank);
2243 finalize_argstream<i>(key);
2245 auto rank = world.rank();
2246 assert(keymap() ==
rank);
2247 finalize_argstream<i>();
2251 template <std::
size_t i>
2254 auto msg =
static_cast<msg_t *
>(
data);
2256 if constexpr (!ttg::meta::is_void_v<keyT>) {
2258 auto rank = world.rank();
2260 pos =
unpack(key, msg->bytes, pos);
2261 assert(keymap(key) ==
rank);
2262 std::size_t argstream_size;
2263 pos =
unpack(argstream_size, msg->bytes, pos);
2264 set_argstream_size<i>(key, argstream_size);
2266 auto rank = world.rank();
2267 assert(keymap() ==
rank);
2268 std::size_t argstream_size;
2269 pos =
unpack(argstream_size, msg->bytes, pos);
2270 set_argstream_size<i>(argstream_size);
2274 template <std::
size_t i>
2277 msg_t *msg =
static_cast<msg_t *
>(
data);
2278 auto &
in = std::get<i>(input_terminals);
2279 if constexpr (!ttg::meta::is_void_v<keyT>) {
2283 pos =
unpack(key, msg->bytes, pos);
2284 set_arg<i>(key, (
in.container).get(key));
2288 template <std::
size_t i,
typename Key,
typename Value>
2289 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2290 const Key &key, Value &&value) {
2291 set_arg_local_impl<i>(key, std::forward<Value>(value));
2294 template <std::
size_t i,
typename Key = keyT,
typename Value>
2295 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2297 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2300 template <std::
size_t i,
typename Key,
typename Value>
2301 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2302 const Key &key,
const Value &value) {
2303 set_arg_local_impl<i>(key, value);
2306 template <std::
size_t i,
typename Key = keyT,
typename Value>
2307 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2308 const Value &value) {
2309 set_arg_local_impl<i>(
ttg::Void{}, value);
2312 template <std::
size_t i,
typename Key = keyT,
typename Value>
2313 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2314 std::shared_ptr<const Value> &valueptr) {
2315 set_arg_local_impl<i>(
ttg::Void{}, *valueptr);
2318 template <
typename Key>
2320 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2321 auto &world_impl = world.impl();
2324 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2325 int32_t priority = 0;
2326 if constexpr (!keyT_is_Void) {
2327 priority = priomap(key);
2329 newtask =
new (taskobj)
task_t(key, mempool, &this->
self, world_impl.taskpool(),
this, priority);
2331 priority = priomap();
2333 newtask =
new (taskobj)
task_t(mempool, &this->
self, world_impl.taskpool(),
this, priority);
2336 for (
int i = 0; i < static_stream_goal.size(); ++i) {
2337 newtask->
streams[i].goal = static_stream_goal[i];
2345 template <std::
size_t i>
2349 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2350 auto &world_impl = world.impl();
2353 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2355 int32_t priority = 0;
2356 if constexpr (!keyT_is_Void) {
2357 priority = priomap(task->
key);
2360 priority = priomap();
2365 world_impl.taskpool(), priority, is_first);
2372 template <std::
size_t i,
typename Key,
typename Value>
2374 parsec_task_t **task_ring =
nullptr) {
2375 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2376 constexpr
const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2377 constexpr
const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2378 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2381 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ", i);
2383 parsec_key_t hk = 0;
2384 if constexpr (!keyT_is_Void) {
2385 hk =
reinterpret_cast<parsec_key_t
>(&key);
2386 assert(keymap(key) == world.rank());
2390 auto &world_impl = world.impl();
2391 auto &reducer = std::get<i>(input_reducers);
2393 bool remove_from_hash =
true;
2394 #if defined(PARSEC_PROF_GRAPHER)
2395 bool discover_task =
true;
2397 bool get_pull_data =
false;
2398 bool has_lock =
false;
2400 if (numins > 1 || reducer) {
2403 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
2405 world_impl.increment_created();
2408 if( world_impl.dag_profiling() ) {
2409 #if defined(PARSEC_PROF_GRAPHER)
2410 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2414 }
else if (!reducer && numins == (task->
in_data_count + 1)) {
2416 parsec_hash_table_nolock_remove(&
tasks_table, hk);
2417 remove_from_hash =
false;
2421 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2426 world_impl.increment_created();
2427 remove_from_hash =
false;
2428 if( world_impl.dag_profiling() ) {
2429 #if defined(PARSEC_PROF_GRAPHER)
2430 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2436 if( world_impl.dag_profiling() ) {
2437 #if defined(PARSEC_PROF_GRAPHER)
2442 if(orig_index >= 0) {
2443 snprintf(orig_str, 32,
"%d", orig_index);
2445 strncpy(orig_str,
"_", 32);
2447 snprintf(dest_str, 32,
"%lu", i);
2448 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2449 .flow_index = 0, .flow_datatype_mask = ~0 };
2450 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2451 .flow_index = 0, .flow_datatype_mask = ~0 };
2462 if (
nullptr != copy) {
2464 copy = detail::register_data_copy<valueT>(copy, task,
is_const);
2475 if (reducer && 1 != task->
streams[i].goal) {
2476 auto submit_reducer_task = [&](
auto *parent_task){
2478 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2483 reduce_task = create_new_reducer_task<i>(parent_task,
false);
2488 if constexpr (!ttg::meta::is_void_v<valueT>) {
2491 if (
nullptr == (copy = task->
copies[i])) {
2492 using decay_valueT = std::decay_t<valueT>;
2497 reduce_task = create_new_reducer_task<i>(task,
true);
2501 task->
streams[i].reduce_count.store(1, std::memory_order_relaxed);
2517 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2520 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2526 parsec_lifo_push(&task->
streams[i].reduce_copies, ©->
super);
2527 submit_reducer_task(task);
2531 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2533 submit_reducer_task(task);
2543 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2546 if constexpr (!valueT_is_Void) {
2547 if (
nullptr != task->
copies[i]) {
2549 throw std::logic_error(
"bad set arg");
2569 if constexpr (!ttg::meta::is_void_v<keyT>) {
2570 if (get_pull_data) {
2577 bool constrained =
false;
2578 if (constraints_check.size() > 0) {
2579 if constexpr (ttg::meta::is_void_v<keyT>) {
2580 constrained = !constraints_check[0]();
2582 constrained = !constraints_check[0](task->
key);
2589 return !constrained;
2592 template<
typename Key = keyT>
2595 assert(cid < constraints_check.size());
2597 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2598 if (!constraints_check[i]()) {
2606 parsec_key_t hk = 0;
2608 assert(task !=
nullptr);
2609 auto &world_impl = world.impl();
2610 parsec_execution_stream_t *es = world_impl.execution_stream();
2611 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2612 __parsec_schedule_vp(es, vp_task_rings, 0);
2616 template<
typename Key = keyT>
2617 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
release_constraint(std::size_t cid,
const std::span<Key>& keys) {
2618 assert(cid < constraints_check.size());
2619 parsec_task_t *task_ring =
nullptr;
2620 for (
auto& key : keys) {
2623 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2624 if (!constraints_check[i](key)) {
2632 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
2634 assert(task !=
nullptr);
2635 if (task_ring ==
nullptr) {
2640 parsec_list_item_ring_push_sorted(&task_ring->super, &task->
parsec_task.super,
2641 offsetof(parsec_task_t, priority));
2645 if (
nullptr != task_ring) {
2646 auto &world_impl = world.impl();
2647 parsec_execution_stream_t *es = world_impl.execution_stream();
2648 parsec_task_t *vp_task_rings[1] = { task_ring };
2649 __parsec_schedule_vp(es, vp_task_rings, 0);
2654 parsec_task_t **task_ring =
nullptr) {
2655 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2664 count = parsec_atomic_fetch_inc_int32(&task->
in_data_count) + 1;
2665 assert(count <=
self.dependencies_goal);
2668 auto &world_impl = world.impl();
2669 ttT *baseobj = task->
tt;
2671 if (count == numins) {
2672 parsec_execution_stream_t *es = world_impl.execution_stream();
2673 parsec_key_t hk = task->
pkey();
2675 if constexpr (!keyT_is_Void) {
2684 if (
nullptr == task_ring) {
2685 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2686 __parsec_schedule_vp(es, vp_task_rings, 0);
2687 }
else if (*task_ring ==
nullptr) {
2692 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->
parsec_task.super,
2693 offsetof(parsec_task_t, priority));
2696 }
else if constexpr (!ttg::meta::is_void_v<keyT>) {
2697 if ((baseobj->num_pullins + count == numins) && baseobj->
is_lazy_pull()) {
2705 template <std::
size_t i,
typename Key,
typename Value>
2706 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2708 set_arg_impl<i>(key, std::forward<Value>(value));
2712 template <std::
size_t i,
typename Key,
typename Value>
2713 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(Value &&value) {
2714 set_arg_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2717 template <std::
size_t i,
typename Key = keyT>
2718 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2723 template <std::
size_t i,
typename Key>
2724 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2728 template<
typename Value,
typename Key>
2730 using decvalueT = std::decay_t<Value>;
2731 bool inline_data =
false;
2733 std::size_t iov_size = 0;
2734 std::size_t metadata_size = 0;
2737 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2738 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2739 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2740 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2746 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2750 std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2758 template <std::
size_t i,
typename Key,
typename Value>
2761 using decvalueT = std::decay_t<Value>;
2762 using norefvalueT = std::remove_reference_t<Value>;
2763 norefvalueT *value_ptr = &value;
2765 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2766 if(world.impl().profiling()) {
2767 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2771 if constexpr (!ttg::meta::is_void_v<Key>)
2772 owner = keymap(key);
2775 if (owner == world.rank()) {
2776 if constexpr (!ttg::meta::is_void_v<keyT>)
2777 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2779 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value), copy_in);
2780 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2781 if(world.impl().profiling()) {
2782 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2791 auto &world_impl = world.impl();
2794 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2797 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2801 if (
nullptr == copy) {
2803 if (
nullptr == copy) {
2807 value_ptr =
static_cast<norefvalueT*
>(copy->
get_ptr());
2812 msg->tt_id.inline_data = inline_data;
2814 auto handle_iovec_fn = [&](
auto&& iovecs){
2818 for (
auto &&iov : iovecs) {
2819 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2820 pos += iov.num_bytes;
2827 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2828 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2829 pos +=
sizeof(cbtag);
2835 for (
auto &&iov : iovecs) {
2836 copy = detail::register_data_copy<decvalueT>(copy,
nullptr,
true);
2837 parsec_ce_mem_reg_handle_t lreg;
2840 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2841 iov.num_bytes, &lreg, &lreg_size);
2842 auto lreg_ptr = std::shared_ptr<void>{lreg, [](
void *
ptr) {
2843 parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)
ptr;
2844 parsec_ce.mem_unregister(&memreg);
2846 int32_t lreg_size_i = lreg_size;
2847 std::memcpy(msg->bytes + pos, &lreg_size_i,
sizeof(lreg_size_i));
2848 pos +=
sizeof(lreg_size_i);
2849 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2853 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2859 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2860 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
2861 pos +=
sizeof(fn_ptr);
2868 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2869 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2871 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2872 pos =
pack(metadata, msg->bytes, pos);
2874 handle_iovec_fn(iovs);
2878 pos =
pack(*value_ptr, msg->bytes, pos, copy);
2886 msg->tt_id.num_iovecs = num_iovecs;
2890 msg->tt_id.num_keys = 0;
2891 msg->tt_id.key_offset = pos;
2892 if constexpr (!ttg::meta::is_void_v<Key>) {
2893 size_t tmppos =
pack(key, msg->bytes, pos);
2895 msg->tt_id.num_keys = 1;
2898 parsec_taskpool_t *tp = world_impl.taskpool();
2899 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2900 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2902 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
2904 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2905 if(world.impl().profiling()) {
2906 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2909 #if defined(PARSEC_PROF_GRAPHER)
2914 if(orig_index >= 0) {
2915 snprintf(orig_str, 32,
"%d", orig_index);
2917 strncpy(orig_str,
"_", 32);
2919 snprintf(dest_str, 32,
"%lu", i);
2920 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2921 .flow_index = 0, .flow_datatype_mask = ~0 };
2922 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2923 .flow_index = 0, .flow_datatype_mask = ~0 };
2931 template <
int i,
typename Iterator,
typename Value>
2933 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2934 if(world.impl().profiling()) {
2935 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2938 parsec_task_t *task_ring =
nullptr;
2944 for (
auto it = begin; it != end; ++it) {
2945 set_arg_local_impl<i>(*it, value, copy, &task_ring);
2948 if (
nullptr != task_ring) {
2949 parsec_task_t *vp_task_ring[1] = { task_ring };
2950 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2952 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2953 if(world.impl().profiling()) {
2954 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2959 template <std::
size_t i,
typename Key,
typename Value>
2960 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2963 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2965 auto np = world.size();
2966 int rank = world.rank();
2968 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2969 [&](
const Key &key) { return keymap(key) != rank; });
2972 using decvalueT = std::decay_t<Value>;
2975 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2976 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](
const Key &a,
const Key &b)
mutable {
2977 int rank_a = keymap(a);
2978 int rank_b = keymap(b);
2980 int pos_a = (rank_a + np - rank) % np;
2981 int pos_b = (rank_b + np - rank) % np;
2982 return pos_a < pos_b;
2986 auto local_begin = keylist_sorted.end();
2987 auto local_end = keylist_sorted.end();
2989 int32_t num_iovs = 0;
2993 assert(
nullptr != copy);
2996 auto &world_impl = world.impl();
2997 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3002 bool inline_data =
can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3003 msg->tt_id.inline_data = inline_data;
3005 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3006 auto handle_iovs_fn = [&](
auto&& iovs){
3010 for (
auto &&iov : iovs) {
3011 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
3012 pos += iov.num_bytes;
3019 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
3020 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
3021 pos +=
sizeof(cbtag);
3023 for (
auto &&iov : iovs) {
3024 parsec_ce_mem_reg_handle_t lreg;
3026 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
3027 iov.num_bytes, &lreg, &lreg_size);
3029 memregs.push_back(std::make_pair(
static_cast<int32_t
>(lreg_size),
3031 std::shared_ptr<void>{lreg, [](
void *
ptr) {
3032 parsec_ce_mem_reg_handle_t memreg =
3033 (parsec_ce_mem_reg_handle_t)
ptr;
3035 parsec_ce.mem_unregister(&memreg);
3045 auto metadata = descr.get_metadata(value);
3046 pos =
pack(metadata, msg->bytes, pos);
3047 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(&value));
3048 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3049 memregs.reserve(num_iovs);
3050 handle_iovs_fn(iovs);
3054 pos =
pack(value, msg->bytes, pos, copy);
3060 msg->tt_id.num_iovecs = num_iovs;
3062 std::size_t save_pos = pos;
3064 parsec_taskpool_t *tp = world_impl.taskpool();
3065 for (
auto it = keylist_sorted.begin(); it < keylist_sorted.end(); ) {
3067 auto owner = keymap(*it);
3068 if (owner ==
rank) {
3072 std::find_if_not(++it, keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3085 for (
int idx = 0; idx < num_iovs; ++idx) {
3088 std::shared_ptr<void> lreg_ptr;
3089 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3090 std::memcpy(msg->bytes + pos, &lreg_size,
sizeof(lreg_size));
3091 pos +=
sizeof(lreg_size);
3092 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3096 copy = detail::register_data_copy<valueT>(copy,
nullptr,
true);
3098 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3104 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3105 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
3106 pos +=
sizeof(fn_ptr);
3111 msg->tt_id.key_offset = pos;
3117 pos =
pack(*it, msg->bytes, pos);
3119 }
while (it < keylist_sorted.end() && keymap(*it) == owner);
3120 msg->tt_id.num_keys = num_keys;
3122 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3123 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3125 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3129 broadcast_arg_local<i>(local_begin, local_end, value);
3132 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3139 template <
typename Key,
typename... Ts,
size_t... Is,
size_t... Js>
3140 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3141 std::index_sequence<Js...>,
const Key &key,
3142 const std::tuple<Ts...> &args) {
3143 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3144 constexpr
size_t js[] = {Js...};
3145 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3151 template <
typename Key,
typename... Ts,
size_t... Is>
3152 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
const Key &key,
3153 const std::tuple<Ts...> &args) {
3154 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3160 template <
typename Key = keyT,
typename... Ts,
size_t... Is,
size_t... Js>
3161 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3162 const std::tuple<Ts...> &args) {
3163 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3164 constexpr
size_t js[] = {Js...};
3165 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3171 template <
typename Key = keyT,
typename... Ts,
size_t... Is>
3172 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
3173 const std::tuple<Ts...> &args) {
3174 set_args(std::index_sequence_for<Ts...>{}, is, args);
3180 template <std::
size_t i>
3182 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3183 assert(
size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3185 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ", i);
3188 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3190 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3193 static_stream_goal[i] =
size;
3199 template <std::
size_t i,
typename Key>
3202 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3203 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3206 const auto owner = keymap(key);
3207 if (owner != world.rank()) {
3208 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ", i);
3210 auto &world_impl = world.impl();
3212 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3214 world_impl.rank(), 1);
3216 pos =
pack(key, msg->bytes, pos);
3218 parsec_taskpool_t *tp = world_impl.taskpool();
3219 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3220 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3221 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3224 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ",
size,
" for terminal ", i);
3226 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3229 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3231 world.impl().increment_created();
3233 if( world.impl().dag_profiling() ) {
3234 #if defined(PARSEC_PROF_GRAPHER)
3235 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3239 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3249 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3251 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3260 template <std::
size_t i,
typename Key = keyT>
3263 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3264 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3267 const auto owner = keymap();
3268 if (owner != world.rank()) {
3269 ttg::trace(world.rank(),
":",
get_name(),
" : forwarding stream size for terminal ", i);
3271 auto &world_impl = world.impl();
3273 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3275 world_impl.rank(), 0);
3277 parsec_taskpool_t *tp = world_impl.taskpool();
3278 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3279 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3280 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3285 parsec_key_t hk = 0;
3288 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3290 world.impl().increment_created();
3292 if( world.impl().dag_profiling() ) {
3293 #if defined(PARSEC_PROF_GRAPHER)
3294 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3298 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3308 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3310 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3318 template <std::
size_t i,
typename Key>
3321 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3324 const auto owner = keymap(key);
3325 if (owner != world.rank()) {
3326 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ", i);
3328 auto &world_impl = world.impl();
3330 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3332 world_impl.rank(), 1);
3334 pos =
pack(key, msg->bytes, pos);
3335 parsec_taskpool_t *tp = world_impl.taskpool();
3336 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3337 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3338 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3341 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ", i);
3343 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3348 " : error finalize called on stream that never received an input data: ", i);
3349 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3360 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3362 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3363 if (1 == c && (task->
streams[i].size >= 1)) {
3370 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3373 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3376 const auto owner = keymap();
3377 if (owner != world.rank()) {
3378 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ", i);
3380 auto &world_impl = world.impl();
3382 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3384 world_impl.rank(), 0);
3385 parsec_taskpool_t *tp = world_impl.taskpool();
3386 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3387 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3388 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3393 auto hk =
static_cast<parsec_key_t
>(0);
3397 " : error finalize called on stream that never received an input data: ", i);
3398 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3409 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3411 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3412 if (1 == c && (task->
streams[i].size >= 1)) {
3422 auto check_parsec_data = [&](parsec_data_t*
data) {
3423 if (
data->owner_device != 0) {
3426 while (flowidx < MAX_PARAM_COUNT &&
3427 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3434 if (flowidx == MAX_PARAM_COUNT) {
3435 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3437 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3440 gpu_task->flow_nb_elts[flowidx] =
data->nb_elts;
3443 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3444 gpu_task->pushout |= 1<<flowidx;
3452 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3453 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3456 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3457 static constexpr
const bool value_is_const = std::is_const_v<valueT>;
3464 if (
nullptr == copy) {
3469 bool need_pushout =
false;
3477 auto &reducer = std::get<i>(input_reducers);
3485 if constexpr (value_is_const) {
3504 need_pushout =
true;
3511 need_pushout =
true;
3515 need_pushout =
true;
3522 need_pushout =
true;
3526 if (!need_pushout) {
3527 bool device_supported =
false;
3537 if (!device_supported) {
3538 need_pushout = remote_check();
3549 template <std::
size_t i,
typename Key,
typename Value>
3550 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3553 auto remote_check = [&](){
3555 int rank = world.rank();
3556 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3557 [&](
const Key &key) { return keymap(key) != rank; });
3560 do_prepare_send<i>(value, remote_check);
3563 template <std::
size_t i,
typename Key,
typename Value>
3564 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3567 auto remote_check = [&](){
3569 int rank = world.rank();
3570 return (keymap() !=
rank);
3572 do_prepare_send<i>(value, remote_check);
3585 TT(
const TT &other) =
delete;
3586 TT &operator=(
const TT &other) =
delete;
3587 TT(
TT &&other) =
delete;
3588 TT &operator=(
TT &&other) =
delete;
3591 template <
typename terminalT, std::
size_t i>
3592 void register_input_callback(terminalT &input) {
3593 using valueT = std::decay_t<typename terminalT::value_type>;
3594 if (input.is_pull_terminal) {
3600 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3601 auto move_callback = [
this](
const keyT &key, valueT &&value) {
3602 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3604 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3605 set_arg<i, keyT, const valueT &>(key, value);
3607 auto broadcast_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3608 broadcast_arg<i, keyT, valueT>(keylist, value);
3610 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3611 prepare_send<i, keyT, valueT>(keylist, value);
3613 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3614 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3615 input.set_callback(send_callback, move_callback, broadcast_callback,
3616 setsize_callback, finalize_callback, prepare_send_callback);
3621 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3622 auto send_callback = [
this](
const keyT &key) { set_arg<i, keyT, ttg::Void>(key,
ttg::Void{}); };
3623 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3624 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3625 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3634 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3635 auto move_callback = [
this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3636 auto send_callback = [
this](
const valueT &value) {
3637 if constexpr (std::is_copy_constructible_v<valueT>) {
3638 set_arg<i, keyT, const valueT &>(value);
3641 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3644 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3645 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3646 auto prepare_send_callback = [
this](
const valueT &value) {
3647 prepare_send<i, void>(value);
3649 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3654 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3655 auto send_callback = [
this]() { set_arg<i, keyT, ttg::Void>(
ttg::Void{}); };
3656 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3657 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3658 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3668 template <std::size_t... IS>
3669 void register_input_callbacks(std::index_sequence<IS...>) {
3672 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3677 template <std::size_t... IS,
typename inedgesT>
3678 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3679 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3683 template <std::size_t... IS,
typename outedgesT>
3684 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3685 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3690 template <
typename input_terminals_tupleT, std::size_t... IS,
typename flowsT>
3691 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3693 (*(
const_cast<std::remove_const_t<decltype(flows[IS]-
>flow_flags)> *>(&(flows[IS]->flow_flags))) =
3694 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3695 : PARSEC_FLOW_ACCESS_RW),
3700 template <
typename input_terminals_tupleT,
typename flowsT>
3701 void initialize_flows(flowsT &&flows) {
3702 _initialize_flows<input_terminals_tupleT>(
3709 static int key_equal(parsec_key_t a, parsec_key_t b,
void *user_data) {
3710 if constexpr (std::is_same_v<keyT, void>) {
3713 keyT &ka = *(
reinterpret_cast<keyT *
>(a));
3714 keyT &kb = *(
reinterpret_cast<keyT *
>(b));
3719 static uint64_t key_hash(parsec_key_t k,
void *user_data) {
3720 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3721 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3724 keyT &kk = *(
reinterpret_cast<keyT *
>(k));
3726 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3731 static char *key_print(
char *buffer,
size_t buffer_size, parsec_key_t k,
void *user_data) {
3732 if constexpr (std::is_same_v<keyT, void>) {
3736 keyT kk = *(
reinterpret_cast<keyT *
>(k));
3737 std::stringstream iss;
3739 memset(buffer, 0, buffer_size);
3740 iss.get(buffer, buffer_size);
3745 static parsec_key_t make_key(
const parsec_taskpool_t *tp,
const parsec_assignment_t *as) {
3747 keyT *key = *(keyT**)&(as[2]);
3748 return reinterpret_cast<parsec_key_t
>(key);
3751 static char *parsec_ttg_task_snprintf(
char *buffer,
size_t buffer_size,
const parsec_task_t *parsec_task) {
3752 if(buffer_size == 0)
3755 if constexpr (ttg::meta::is_void_v<keyT>) {
3756 snprintf(buffer, buffer_size,
"%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3758 const task_t *task =
reinterpret_cast<const task_t*
>(parsec_task);
3759 std::stringstream ss;
3762 std::string keystr = ss.str();
3763 std::replace(keystr.begin(), keystr.end(),
'(',
':');
3764 std::replace(keystr.begin(), keystr.end(),
')',
':');
3766 snprintf(buffer, buffer_size,
"%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3771 #if defined(PARSEC_PROF_TRACE)
3772 static void *parsec_ttg_task_info(
void *dst,
const void *
data,
size_t size)
3774 const task_t *task =
reinterpret_cast<const task_t *
>(
data);
3776 if constexpr (ttg::meta::is_void_v<keyT>) {
3777 snprintf(
reinterpret_cast<char*
>(dst),
size,
"()");
3779 std::stringstream ss;
3781 snprintf(
reinterpret_cast<char*
>(dst),
size,
"%s", ss.str().c_str());
3787 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3789 template<std::
size_t I>
3790 inline static void increment_data_version_impl(task_t *task) {
3791 if constexpr (!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3792 if (task->copies[I] !=
nullptr){
3793 task->copies[I]->inc_current_version();
3798 template<std::size_t... Is>
3799 inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3801 int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3805 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3809 task_t *task = (task_t*)parsec_task;
3811 #ifdef TTG_HAVE_COROUTINE
3813 if (task->suspended_task_address) {
3815 #ifdef TTG_HAVE_DEVICE
3822 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3825 auto dev_data = dev_task.promise();
3827 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3828 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3831 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3834 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3835 task->tt->set_outputs_tls_ptr();
3836 dev_data.do_sends();
3837 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3843 task->suspended_task_address =
nullptr;
3848 for (
int i = 0; i < task->data_count; i++) {
3849 detail::ttg_data_copy_t *copy = task->
copies[i];
3850 if (
nullptr == copy)
continue;
3852 task->copies[i] =
nullptr;
3855 for (
auto& c : task->tt->constraints_complete) {
3856 if constexpr(std::is_void_v<keyT>) {
3862 return PARSEC_HOOK_RETURN_DONE;
3866 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3867 typename priomapT = ttg::detail::default_priomap<keyT>>
3868 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3869 ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3870 :
ttg::
TTBase(name, numinedges, numouts)
3873 , keymap(std::is_same<keymapT,
ttg::detail::default_keymap<keyT>>::value
3874 ? decltype(keymap)(
ttg::detail::default_keymap<keyT>(world))
3875 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3876 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3878 if (innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3879 if (outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3881 auto &world_impl = world.
impl();
3882 world_impl.register_op(
this);
3884 if constexpr (numinedges == numins) {
3892 register_input_callbacks(std::make_index_sequence<numinedges>{});
3895 memset(&
self, 0,
sizeof(parsec_task_class_t));
3897 self.name = strdup(
get_name().c_str());
3899 self.nb_parameters = 0;
3902 self.nb_flows = MAX_PARAM_COUNT;
3905 if( world_impl.profiling() ) {
3907 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3909 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3920 self.make_key = make_key;
3921 self.key_functions = &tasks_hash_fcts;
3922 self.task_snprintf = parsec_ttg_task_snprintf;
3924 #if defined(PARSEC_PROF_TRACE)
3925 self.profile_info = &parsec_ttg_task_info;
3928 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
3933 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3934 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
3935 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3936 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3937 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3938 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3939 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3941 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3942 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_HIP;
3943 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3944 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_hip<TT>;
3946 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3947 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3948 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3949 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3951 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3952 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3953 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3954 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3956 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3957 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3958 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3961 self.incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
3962 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CPU;
3963 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
3964 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook<TT>;
3965 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3966 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3967 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3971 self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3972 self.complete_execution = complete_task_and_release;
3974 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3975 parsec_flow_t *flow =
new parsec_flow_t;
3976 flow->name = strdup((std::string(
"flow in") + std::to_string(i)).c_str());
3977 flow->sym_type = PARSEC_SYM_INOUT;
3980 flow->dep_in[0] = NULL;
3981 flow->dep_out[0] = NULL;
3982 flow->flow_index = i;
3983 flow->flow_datatype_mask = ~0;
3984 *((parsec_flow_t **)&(
self.
in[i])) = flow;
3989 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3990 parsec_flow_t *flow =
new parsec_flow_t;
3991 flow->name = strdup((std::string(
"flow out") + std::to_string(i)).c_str());
3992 flow->sym_type = PARSEC_SYM_INOUT;
3993 flow->flow_flags = PARSEC_FLOW_ACCESS_READ;
3994 flow->dep_in[0] = NULL;
3995 flow->dep_out[0] = NULL;
3996 flow->flow_index = i;
3997 flow->flow_datatype_mask = (1 << i);
3998 *((parsec_flow_t **)&(
self.
out[i])) = flow;
4003 self.dependencies_goal = numins;
4006 auto *context = world_impl.context();
4007 for (
int i = 0; i < context->nb_vp; i++) {
4008 nbthreads += context->virtual_processes[i]->nb_cores;
4011 parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t),
sizeof(
task_t),
4012 offsetof(parsec_task_t, mempool_owner), nbthreads);
4021 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4022 typename priomapT = ttg::detail::default_priomap<keyT>>
4023 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4026 std::forward<priomapT>(priomap)) {}
4028 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4029 typename priomapT = ttg::detail::default_priomap<keyT>>
4031 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
ttg::World world,
4032 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4033 :
TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4034 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4035 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4037 if constexpr (numinedges > 0) {
4038 register_input_callbacks(std::make_index_sequence<numinedges>{});
4041 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4042 typename priomapT = ttg::detail::default_priomap<keyT>>
4044 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4047 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4051 if(
nullptr !=
self.name ) {
4052 free((
void*)
self.name);
4053 self.name =
nullptr;
4056 for (std::size_t i = 0; i < numins; ++i) {
4057 if (inpute_reducers_taskclass[i] !=
nullptr) {
4058 std::free(inpute_reducers_taskclass[i]);
4059 inpute_reducers_taskclass[i] =
nullptr;
4067 ttT *op = (
ttT *)cb_data;
4068 if constexpr (!ttg::meta::is_void_v<keyT>) {
4069 std::cout <<
"Left over task " << op->
get_name() <<
" " << task->
key << std::endl;
4071 std::cout <<
"Left over task " << op->
get_name() << std::endl;
4089 parsec_mempool_destruct(&mempools);
4092 free((__parsec_chore_t *)
self.incarnations);
4093 for (
int i = 0; i < MAX_PARAM_COUNT; i++) {
4094 if (NULL !=
self.
in[i]) {
4095 free(
self.
in[i]->name);
4097 self.in[i] =
nullptr;
4099 if (NULL !=
self.
out[i]) {
4100 free(
self.
out[i]->name);
4102 self.out[i] =
nullptr;
4105 world.
impl().deregister_op(
this);
4115 template <std::
size_t i,
typename Reducer>
4118 std::get<i>(input_reducers) = reducer;
4120 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4121 if (
nullptr == tc) {
4122 tc = (parsec_task_class_t *)std::calloc(1,
sizeof(*tc));
4123 inpute_reducers_taskclass[i] = tc;
4125 tc->name = strdup((
get_name() + std::string(
" reducer ") + std::to_string(i)).c_str());
4127 tc->nb_parameters = 0;
4129 tc->nb_flows = numflows;
4131 auto &world_impl = world.
impl();
4133 if( world_impl.profiling() ) {
4135 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4137 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4148 tc->make_key = make_key;
4149 tc->key_functions = &tasks_hash_fcts;
4150 tc->task_snprintf = parsec_ttg_task_snprintf;
4152 #if defined(PARSEC_PROF_TRACE)
4153 tc->profile_info = &parsec_ttg_task_info;
4156 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
4161 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
4162 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
4163 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
4165 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_CPU;
4166 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
4167 ((__parsec_chore_t *)
self.incarnations)[1].hook =
detail::hook;
4168 ((__parsec_chore_t *)
self.incarnations)[2].type = PARSEC_DEV_NONE;
4169 ((__parsec_chore_t *)
self.incarnations)[2].evaluate = NULL;
4170 ((__parsec_chore_t *)
self.incarnations)[2].hook = NULL;
4174 tc->incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
4175 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4176 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4177 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4178 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4179 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4180 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4184 tc->release_task = &parsec_release_task_to_mempool;
4185 tc->complete_execution = NULL;
4196 template <std::
size_t i,
typename Reducer>
4198 set_input_reducer<i>(std::forward<Reducer>(reducer));
4199 set_static_argstream_size<i>(
size);
4204 template <std::
size_t i>
4205 std::tuple_element_t<i, input_terminals_type> *
in() {
4206 return &std::get<i>(input_terminals);
4211 template <std::
size_t i>
4212 std::tuple_element_t<i, output_terminalsT> *
out() {
4213 return &std::get<i>(output_terminals);
4217 template <
typename Key = keyT>
4218 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4221 if constexpr(!std::is_same_v<Key, key_type>) {
4226 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4228 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4229 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4234 template <
typename Key = keyT>
4235 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4239 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4241 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4242 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4246 template <
typename Key = keyT>
4247 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4251 if constexpr(!std::is_same_v<Key, key_type>) {
4256 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4257 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4262 template <
typename Key = keyT>
4263 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke() {
4266 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4267 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4272 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4279 template<
typename Key,
typename Arg,
typename... Args, std::size_t I, std::size_t... Is>
4280 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key, Arg&& arg, Args&&... args) {
4281 using arg_type = std::decay_t<Arg>;
4282 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4287 copy->reset_readers();
4289 set_arg_impl<I>(key, val, copy);
4291 if constexpr (std::is_rvalue_reference_v<Arg>) {
4295 }
else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4296 set_arg<I>(key, std::forward<Arg>(arg));
4298 if constexpr (
sizeof...(Is) > 0) {
4300 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4306 template <
typename Key = keyT,
typename Arg,
typename... Args>
4307 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4308 const Key &key, Arg&& arg, Args&&... args) {
4309 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4310 "Number of arguments to invoke must match the number of task inputs.");
4313 invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4314 std::forward<Arg>(arg), std::forward<Args>(args)...);
4317 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4318 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4322 m_defer_writer = value;
4326 return m_defer_writer;
4331 world.
impl().register_tt_profiling(
this);
4341 template <
typename Keymap>
4352 template <
typename Priomap>
4354 priomap = std::forward<Priomap>(pm);
4362 template<
typename Devicemap>
4367 devicemap = std::forward<Devicemap>(dm);
4370 devicemap = [=](
const keyT& key) {
4378 throw std::runtime_error(
"Unknown device type!");
4390 template<
typename Constra
int>
4392 std::size_t cid = constraints_check.size();
4393 if constexpr(ttg::meta::is_void_v<keyT>) {
4395 constraints_check.push_back([c,
this](){
return c->check(
this); });
4396 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(
this);
return true; });
4398 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4399 constraints_check.push_back([c,
this](
const keyT& key){
return c->check(key,
this); });
4400 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(key,
this);
return true; });
4406 template<
typename Constra
int>
4409 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4415 template<
typename Constra
int,
typename Mapper>
4417 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4418 std::size_t cid = constraints_check.size();
4419 if constexpr(ttg::meta::is_void_v<keyT>) {
4421 constraints_check.push_back([map, c,
this](){
return c->check(map(),
this); });
4422 constraints_complete.push_back([map, c,
this](){ c->complete(map(),
this);
return true; });
4424 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4425 constraints_check.push_back([map, c,
this](
const keyT& key){
return c->check(key, map(key),
this); });
4426 constraints_complete.push_back([map, c,
this](
const keyT& key){ c->complete(key, map(key),
this);
return true; });
4433 template<
typename Constra
int,
typename Mapper>
4436 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4442 MPI_Comm_rank(MPI_COMM_WORLD, &
rank);
4445 auto &world_impl = world.
impl();
4449 auto tp = world_impl.taskpool();
4455 std::vector<static_set_arg_fct_arg_t> tmp;
4456 for (
auto it = se.first; it != se.second;) {
4458 tmp.push_back(it->second);
4463 for (
auto it : tmp) {
4466 std::get<1>(it),
", ", std::get<2>(it),
")");
4467 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4468 std::get<0>(it), NULL);
4470 free(std::get<1>(it));
4493 bool do_release =
true;
4499 : copy_to_remove(h.copy_to_remove)
4501 h.copy_to_remove =
nullptr;
4507 std::swap(copy_to_remove, h.copy_to_remove);
4512 if (
nullptr != copy_to_remove) {
4520 template <
typename Value>
4521 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4522 constexpr
auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4523 using value_type = std::remove_reference_t<Value>;
4524 static_assert(value_is_rvref ||
4525 std::is_copy_constructible_v<std::decay_t<Value>>,
4526 "Data sent without being moved must be copy-constructible!");
4529 if (
nullptr == caller) {
4530 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4535 value_type *value_ptr = &value;
4536 if (
nullptr == copy) {
4544 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4545 copy_to_remove = copy;
4547 if constexpr (value_is_rvref) {
4555 if constexpr (value_is_rvref)
4556 return std::move(*value_ptr);
4561 template<
typename Value>
4564 if (
nullptr == caller) {
4565 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4569 if (
nullptr == copy) {
4574 copy_to_remove = copy;
4586 template <
typename Value>
4589 if (
nullptr == caller) {
4590 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4594 const Value *value_ptr = &value;
4595 if (
nullptr == copy) {
4603 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4604 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
const std::string & get_name() const
Gets the name of this operation.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
void finalize_argstream_from_msg(void *data, std::size_t size)
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
std::tuple_element_t< i, input_terminals_type > * in()
static constexpr bool derived_has_hip_op()
void set_keymap(Keymap &&km)
keymap setter
decltype(keymap) const & get_keymap() const
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
void print_incomplete_tasks()
void add_constraint(std::shared_ptr< Constraint > c)
void set_arg_from_msg(void *data, std::size_t size)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
void add_constraint(Constraint &&c)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
static constexpr bool derived_has_level_zero_op()
parsec_thread_mempool_t * get_task_mempool(void)
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
void add_constraint(Constraint c, Mapper &&map)
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
bool check_constraints(task_t *task)
task_t * create_new_task(const Key &key)
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
bool get_defer_writer(bool value)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
static void ht_iter_cb(void *item, void *cb_data)
const auto & get_output_terminals() const
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_input_reducer(Reducer &&reducer, std::size_t size)
output_terminalsT output_terminals_type
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
void set_input_reducer(Reducer &&reducer)
decltype(priomap) const & get_priomap() const
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
void get_from_pull_msg(void *data, std::size_t size)
static constexpr int numinvals
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
ttg::World get_world() const override final
void set_defer_writer(bool value)
void make_executable() override
Marks this executable.
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
virtual void release() override
void set_devicemap(Devicemap &&dm)
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
void register_static_op_function(void)
static resultT get(InTuple &&intuple)
static auto & get(InTuple &&intuple)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
actual_input_tuple_type input_args_type
void set_priomap(Priomap &&pm)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
std::tuple_element_t< i, output_terminalsT > * out()
void argstream_set_size_from_msg(void *data, std::size_t size)
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
void set_static_argstream_size(std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
static constexpr bool derived_has_device_op()
static constexpr const ttg::Runtime runtime
static constexpr bool derived_has_cuda_op()
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs > *t)
virtual void profile_off() override
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
auto * execution_stream()
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
bool complete_transfer(void)
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
void set_current(int device, Stream stream)
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
bool all_devices_peer_access
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
std::size_t max_inline_size
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
const parsec_symbol_t parsec_taskclass_param1
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
const parsec_symbol_t parsec_taskclass_param2
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * create_new_datacopy(Value &&value)
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
void release_data_copy(ttg_data_copy_t *copy)
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
const parsec_symbol_t parsec_taskclass_param3
const parsec_symbol_t parsec_taskclass_param0
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
int rank(World world=default_execution_context())
ttg::World & get_default_world()
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(value_copy_handler &&h)
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
value_copy_handler & operator=(const value_copy_handler &h)=delete
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t task_constraint_table
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
ttg_parsec_data_flags data_flags
parsec_hash_table_item_t tt_ht_item
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
static void release_task(parsec_ttg_task_base_t *task_base)
static constexpr int mutable_tag
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
virtual void * get_ptr()=0
void transfer_ownership(int access, int device=0)
void inc_current_version()
void foreach_parsec_data(Fn &&fn)
iovec_iterator iovec_begin()
void set_next_task(parsec_task_t *task)
iovec_iterator iovec_end()
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER