2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
13 #define TTG_PARSEC_DEFER_WRITER false
15 #include "ttg/config.h"
20 #include "../../ttg.h"
57 #include <experimental/type_traits>
72 #if defined(TTG_HAVE_MPI)
74 #if defined(TTG_HAVE_MPIEXT)
81 #include <parsec/class/parsec_hash_table.h>
82 #include <parsec/data_internal.h>
83 #include <parsec/execution_stream.h>
84 #include <parsec/interfaces/interface.h>
85 #include <parsec/mca/device/device.h>
86 #include <parsec/parsec_comm_engine.h>
87 #include <parsec/parsec_internal.h>
88 #include <parsec/scheduling.h>
89 #include <parsec/remote_dep.h>
90 #include <parsec/utils/mca_param.h>
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
149 uint64_t
op_id = std::numeric_limits<uint64_t>::max();
170 static void unregister_parsec_tags(
void *_);
183 uint32_t taskpool_id,
188 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
194 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *
data,
long unsigned int size,
195 int src_rank,
void *obj) {
197 parsec_taskpool_t *tp = NULL;
199 uint64_t op_id = msg->
op_id;
203 if (PARSEC_TERM_TP_NOT_READY != tp->tdm.module->taskpool_state(tp)) {
207 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
208 static_set_arg_fct = op_pair.first;
209 static_set_arg_fct(
data,
size, op_pair.second);
210 tp->tdm.module->incoming_message_end(tp, NULL);
212 }
catch (
const std::out_of_range &e)
215 auto data_cpy = std::make_unique_for_overwrite<std::byte[]>(
size);
218 ", ", op_id,
", ", data_cpy,
", ",
size,
")");
224 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
225 int src,
void *cb_data);
228 static bool im =
false;
239 bool _task_profiling;
241 mpi_space_support = {
true,
false,
false};
243 int query_comm_size() {
245 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
249 int query_comm_rank() {
251 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
255 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
261 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
268 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
269 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
270 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
271 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
274 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
277 , own_ctx(c == nullptr)
278 #if defined(PARSEC_PROF_TRACE)
279 , profiling_array(nullptr)
280 , profiling_array_size(0)
282 , _dag_profiling(false)
283 , _task_profiling(false)
286 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
290 #
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
291 || MPIX_Query_cuda_support()
298 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
299 || MPIX_Query_hip_support()
305 #if defined(PARSEC_PROF_TRACE)
306 if(parsec_profile_enabled) {
308 #if defined(PARSEC_TTG_PROFILE_BACKEND)
309 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
310 (
int*)&parsec_ttg_profile_backend_set_arg_start,
311 (
int*)&parsec_ttg_profile_backend_set_arg_end);
312 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
313 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
314 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
315 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
316 sizeof(
size_t),
"size{int64_t}",
317 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
318 (
int*)&parsec_ttg_profile_backend_free_datacopy);
323 #ifdef PARSEC_PROF_GRAPHER
326 dot_param_idx = parsec_mca_param_find(
"profile", NULL,
"dot");
328 if (dot_param_idx != PARSEC_ERROR) {
330 parsec_mca_param_lookup_string(dot_param_idx, &filename);
335 if( NULL != parsec_ce.tag_register) {
349 assert(
nullptr == tpool);
350 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
351 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
353 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
354 tpool->taskpool_name = strdup(
"TTG Taskpool");
355 parsec_taskpool_reserve_id(tpool);
357 tpool->devices_index_mask = 0;
358 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
359 parsec_device_module_t *device = parsec_mca_device_get(i);
360 if( NULL == device )
continue;
361 tpool->devices_index_mask |= (1 << device->device_index);
364 #ifdef TTG_USE_USER_TERMDET
365 parsec_termdet_open_module(tpool,
"user_trigger");
367 parsec_termdet_open_dyn_module(tpool);
375 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
378 #if defined(PARSEC_PROF_TRACE)
379 tpool->profiling_array = profiling_array;
388 parsec_taskpool_started =
false;
408 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
411 if (!parsec_taskpool_started) {
412 parsec_enqueue(ctx, tpool);
413 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
414 tpool->tdm.module->taskpool_ready(tpool);
415 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
417 parsec_taskpool_started =
true;
422 #if defined(PARSEC_PROF_TRACE)
426 tpool->profiling_array =
nullptr;
428 assert(NULL != tpool->tdm.monitor);
429 tpool->tdm.module->unmonitor_taskpool(tpool);
430 parsec_taskpool_free(tpool);
436 if (parsec_taskpool_started) {
438 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
439 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
441 parsec_context_wait(ctx);
443 parsec_taskpool_wait(tpool);
449 unregister_parsec_tags(
nullptr);
451 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
453 #if defined(PARSEC_PROF_TRACE)
454 if(
nullptr != profiling_array) {
455 free(profiling_array);
456 profiling_array =
nullptr;
457 profiling_array_size = 0;
460 if (own_ctx) parsec_fini(&ctx);
476 virtual void dag_on(
const std::string &filename)
override {
477 #if defined(PARSEC_PROF_GRAPHER)
478 if(!_dag_profiling) {
480 size_t len = strlen(filename.c_str())+32;
481 char ext_filename[len];
482 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
483 parsec_prof_grapher_init(ctx, ext_filename);
484 _dag_profiling =
true;
487 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
488 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
493 #if defined(PARSEC_PROF_GRAPHER)
495 parsec_prof_grapher_fini();
496 _dag_profiling =
false;
502 #if defined(PARSEC_PROF_TRACE)
503 _task_profiling =
false;
508 #if defined(PARSEC_PROF_TRACE)
509 _task_profiling =
true;
513 virtual bool profiling()
override {
return _task_profiling; }
516 return mpi_space_support[
static_cast<std::size_t
>(space)];
520 #ifdef TTG_USE_USER_TERMDET
521 if(parsec_taskpool_started) {
523 parsec_taskpool_started =
false;
528 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
531 #if defined(PARSEC_PROF_TRACE)
532 std::stringstream ss;
533 build_composite_name_rec(t->
ttg_ptr(), ss);
540 #if defined(PARSEC_PROF_TRACE)
541 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
544 build_composite_name_rec(t->
ttg_ptr(), ss);
548 void register_new_profiling_event(
const char *name,
int position) {
549 if(2*position >= profiling_array_size) {
550 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
551 profiling_array = (
int*)realloc((
void*)profiling_array,
552 new_profiling_array_size *
sizeof(int));
553 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
554 profiling_array_size = new_profiling_array_size;
555 tpool->profiling_array = profiling_array;
558 assert(0 == tpool->profiling_array[2*position]);
559 assert(0 == tpool->profiling_array[2*position+1]);
563 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
564 (
int*)&tpool->profiling_array[2*position],
565 (
int*)&tpool->profiling_array[2*position+1]);
571 if (!parsec_taskpool_started) {
572 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
576 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
578 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
580 parsec_taskpool_wait(tpool);
593 parsec_context_t *ctx =
nullptr;
594 bool own_ctx =
false;
595 parsec_taskpool_t *tpool =
nullptr;
596 bool parsec_taskpool_started =
false;
597 #if defined(PARSEC_PROF_TRACE)
598 int *profiling_array;
599 std::size_t profiling_array_size;
603 static void unregister_parsec_tags(
void *_pidx)
605 if(NULL != parsec_ce.tag_unregister) {
614 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
622 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
630 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
638 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
648 if (task ==
nullptr ||
ptr ==
nullptr) {
653 if (NULL != copy && copy->get_ptr() ==
ptr) {
663 if (task ==
nullptr ||
ptr ==
nullptr) {
668 if (NULL != copy && copy->get_ptr() ==
ptr) {
676 if (task ==
nullptr || copy ==
nullptr) {
680 if (MAX_PARAM_COUNT < task->data_count) {
681 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
693 if (copy == task->
copies[i]) {
703 task->
copies[i] =
nullptr;
707 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
708 #warning "ttg::PaRSEC enables data copy tracking"
709 static std::unordered_set<ttg_data_copy_t *> pending_copies;
710 static std::mutex pending_copies_mutex;
712 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
713 static int64_t parsec_ttg_data_copy_uid = 0;
716 template <
typename Value>
718 using value_type = std::decay_t<Value>;
721 std::is_constructible_v<value_type, decltype(value)>) {
722 copy =
new value_type(std::forward<Value>(value));
727 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
729 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
732 copy->size =
sizeof(Value);
733 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
735 static_cast<uint64_t
>(copy->uid),
736 PROFILE_OBJECT_ID_NULL, ©->size,
737 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
740 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
742 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
743 auto rc = pending_copies.insert(copy);
744 assert(std::get<1>(rc));
751 template <std::size_t... IS,
typename Key = keyT>
753 int junk[] = {0, (invoke_pull_terminal<IS>(
754 std::get<IS>(input_terminals), key, task),
764 parsec_data_transfer_ownership_to_copy(
data, device, PARSEC_FLOW_ACCESS_RW);
766 data->device_copies[0]->version++;
771 template<
typename TT, std::size_t... Is>
776 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>
> *>(
781 template<
typename TT>
782 inline parsec_hook_return_t
hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
784 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
787 return me->template invoke_op<ttg::ExecutionSpace::Host>();
790 template<
typename TT>
791 inline parsec_hook_return_t
hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
794 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
796 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
797 return PARSEC_HOOK_RETURN_ERROR;
801 template<
typename TT>
802 inline parsec_hook_return_t
hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
805 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
807 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
808 return PARSEC_HOOK_RETURN_ERROR;
812 template<
typename TT>
813 inline parsec_hook_return_t
hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
816 return me->template invoke_op<ttg::ExecutionSpace::L0>();
818 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
819 return PARSEC_HOOK_RETURN_ERROR;
824 template<
typename TT>
825 inline parsec_hook_return_t
evaluate_cuda(
const parsec_task_t *parsec_task) {
828 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
830 return PARSEC_HOOK_RETURN_NEXT;
834 template<
typename TT>
835 inline parsec_hook_return_t
evaluate_hip(
const parsec_task_t *parsec_task) {
838 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
840 return PARSEC_HOOK_RETURN_NEXT;
844 template<
typename TT>
848 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
850 return PARSEC_HOOK_RETURN_NEXT;
855 template <
typename KeyT,
typename ActivationCallbackT>
857 std::vector<KeyT> _keylist;
858 std::atomic<int> _outstanding_transfers;
859 ActivationCallbackT _cb;
864 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
867 int left = --_outstanding_transfers;
869 _cb(std::move(_keylist), _copy);
876 template <
typename ActivationT>
877 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
878 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
880 parsec_ce.mem_unregister(&lreg);
881 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
882 if (activation->complete_transfer()) {
885 return PARSEC_SUCCESS;
888 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
889 int src,
void *cb_data) {
890 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
891 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
894 return PARSEC_SUCCESS;
897 template <
typename FuncT>
898 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
899 int src,
void *cb_data) {
900 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
901 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
904 return PARSEC_SUCCESS;
918 }
else if (readers == 1) {
924 if (1 == readers || readers == copy->
mutable_tag) {
925 std::atomic_thread_fence(std::memory_order_acquire);
938 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
940 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
941 size_t rc = pending_copies.erase(copy);
945 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
949 static_cast<uint64_t
>(copy->uid),
950 PROFILE_OBJECT_ID_NULL, ©->size,
951 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
959 template <
typename Value>
962 bool replace =
false;
964 assert(readers != 0);
968 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->
defer_writer;
994 }
else if (!readonly) {
1008 if (1 == copy_in->
num_readers() && !defer_writer) {
1015 std::atomic_thread_fence(std::memory_order_release);
1029 if (NULL == copy_res) {
1031 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1038 for (
int i = 0; i < deferred_op->
data_count; ++i) {
1039 if (deferred_op->
copies[i] == copy_in) {
1040 deferred_op->
copies[i] = new_copy;
1053 copy_res = new_copy;
1057 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<Value>).name() +
" but the type is not copyable");
1066 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1069 int mpi_initialized;
1070 MPI_Initialized(&mpi_initialized);
1071 if (!mpi_initialized) {
1073 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1075 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1088 for (
int i = 0; i < parsec_nb_devices; ++i) {
1089 bool is_gpu = parsec_mca_device_is_gpu(i);
1093 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1098 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1099 if (
nullptr != ttg_max_inline_cstr) {
1100 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1106 bool all_peer_access =
true;
1108 for (
int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1109 parsec_device_module_t *idevice = parsec_mca_device_get(i);
1110 if (PARSEC_DEV_IS_GPU(idevice->type)) {
1111 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1112 for (
int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1114 parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1115 if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1116 all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true :
false;
1131 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1140 template <
typename T>
1142 world.
impl().register_ptr(
ptr);
1145 template <
typename T>
1147 world.
impl().register_ptr(std::move(
ptr));
1151 world.
impl().register_status(status_ptr);
1154 template <
typename Callback>
1156 world.
impl().register_callback(std::forward<Callback>(callback));
1162 double result = 0.0;
1163 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1168 MPI_Barrier(world.
impl().comm());
1173 template <
typename T>
1176 if (world.
rank() == source_rank) {
1179 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1181 unsigned char *buf =
new unsigned char[BUFLEN];
1182 if (world.
rank() == source_rank) {
1185 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1186 if (world.
rank() != source_rank) {
1199 parsec_task_class_t
self;
1204 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs, ttg::ExecutionSpace Space>
1208 static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1209 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1211 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1214 static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1215 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1216 static_assert((ttg::meta::none_has_reference_v<input_valueTs>),
"Input typelist cannot contain reference types");
1217 static_assert(ttg::meta::is_none_Void_v<input_valueTs>,
"ttg::Void is for internal use only, do not use it");
1219 parsec_mempool_t mempools;
1222 template <
typename T>
1223 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1225 template <
typename T>
1226 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1228 template <
typename T>
1229 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1233 static constexpr
int numinedges = std::tuple_size_v<input_tuple_type>;
1234 static constexpr
int numins = std::tuple_size_v<actual_input_tuple_type>;
1235 static constexpr
int numouts = std::tuple_size_v<output_terminalsT>;
1236 static constexpr
int numflows = std::max(numins, numouts);
1240 template<
typename DerivedT = derivedT>
1246 template<
typename DerivedT = derivedT>
1252 template<
typename DerivedT = derivedT>
1258 template<
typename DerivedT = derivedT>
1260 return (derived_has_cuda_op<DerivedT>() ||
1261 derived_has_hip_op<DerivedT>() ||
1262 derived_has_level_zero_op<DerivedT>());
1267 "Data sent from a device-capable template task must be serializable.");
1276 ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1278 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1283 std::tuple_size_v<input_refs_tuple_type>;
1289 template <std::
size_t i,
typename resultT,
typename InTuple>
1290 static resultT
get(InTuple &&intuple) {
1291 return static_cast<resultT
>(std::get<i>(std::forward<InTuple>(intuple)));
1293 template <std::
size_t i,
typename InTuple>
1294 static auto &
get(InTuple &&intuple) {
1295 return std::get<i>(std::forward<InTuple>(intuple));
1305 constexpr
static const size_t task_key_offset =
sizeof(task_t);
1308 output_terminalsT output_terminals;
1314 template <std::size_t... IS>
1315 static constexpr
auto make_set_args_fcts(std::index_sequence<IS...>) {
1316 using resultT = decltype(set_arg_from_msg_fcts);
1317 return resultT{{&TT::set_arg_from_msg<IS>...}};
1319 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_arg_from_msg_fcts =
1320 make_set_args_fcts(std::make_index_sequence<numins>{});
1322 template <std::size_t... IS>
1323 static constexpr
auto make_set_size_fcts(std::index_sequence<IS...>) {
1324 using resultT = decltype(set_argstream_size_from_msg_fcts);
1325 return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1327 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1328 make_set_size_fcts(std::make_index_sequence<numins>{});
1330 template <std::size_t... IS>
1331 static constexpr
auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1332 using resultT = decltype(finalize_argstream_from_msg_fcts);
1333 return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1335 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1336 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1338 template <std::size_t... IS>
1339 static constexpr
auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1340 using resultT = decltype(get_from_pull_msg_fcts);
1341 return resultT{{&TT::get_from_pull_msg<IS>...}};
1343 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1344 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1346 template<std::size_t... IS>
1347 constexpr
static auto make_input_is_const(std::index_sequence<IS...>) {
1348 using resultT = decltype(input_is_const);
1349 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1351 constexpr
static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1354 ttg::meta::detail::keymap_t<keyT> keymap;
1355 ttg::meta::detail::keymap_t<keyT> priomap;
1356 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1358 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1360 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1361 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1362 int num_pullins = 0;
1366 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1367 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1376 template <
typename... Args>
1377 auto op(Args &&...args) {
1378 derivedT *derived =
static_cast<derivedT *
>(
this);
1379 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1380 if constexpr (std::is_same_v<return_type,void>) {
1381 derived->op(std::forward<Args>(args)...);
1385 return derived->op(std::forward<Args>(args)...);
1389 template <std::
size_t i,
typename terminalT,
typename Key>
1390 void invoke_pull_terminal(terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1391 if (
in.is_pull_terminal) {
1392 auto owner =
in.container.owner(key);
1393 if (owner != world.rank()) {
1394 get_pull_terminal_data_from<i>(owner, key);
1397 set_arg<i>(key, (
in.container).get(key));
1402 template <std::
size_t i,
typename Key>
1403 void get_pull_terminal_data_from(
const int owner,
1405 using msg_t = detail::msg_t;
1406 auto &world_impl = world.impl();
1407 parsec_taskpool_t *tp = world_impl.taskpool();
1408 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), tp->taskpool_id,
1413 pos =
pack(key, msg->bytes, pos);
1414 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1415 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1416 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
1417 sizeof(msg_header_t) + pos);
1420 template <std::size_t... IS,
typename Key = keyT>
1421 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1422 int junk[] = {0, (invoke_pull_terminal<IS>(
1423 std::get<IS>(input_terminals), key, task),
1428 template <std::size_t... IS>
1429 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1431 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1432 task->copies[IS]->get_ptr()))...};
1435 #ifdef TTG_HAVE_DEVICE
1439 static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1440 parsec_gpu_task_t *gpu_task,
1441 parsec_gpu_exec_stream_t *gpu_stream) {
1443 task_t *task = (task_t*)gpu_task->ec;
1445 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1447 task->dev_ptr->stream = gpu_stream;
1452 auto dev_data = dev_task.promise();
1455 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1456 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1458 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1460 parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1464 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1466 parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1470 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1472 parsec_level_zero_exec_stream_t *stream;
1473 stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1480 static_op(&task->parsec_task);
1484 auto discard_tmp_flows = [&](){
1485 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1488 const_cast<parsec_flow_t*
>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1489 task->parsec_task.data[i].data_out->readers = 1;
1495 int rc = PARSEC_HOOK_RETURN_DONE;
1496 if (
nullptr != task->suspended_task_address) {
1498 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1499 dev_data = dev_task.promise();
1501 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1502 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1503 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1505 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1506 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1509 discard_tmp_flows();
1512 rc = PARSEC_HOOK_RETURN_AGAIN;
1517 discard_tmp_flows();
1522 static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1524 task_t *task = (task_t*)parsec_task;
1525 if (task->dev_ptr->gpu_task ==
nullptr) {
1528 parsec_gpu_task_t *gpu_task;
1530 gpu_task =
static_cast<parsec_gpu_task_t*
>(std::calloc(1,
sizeof(*gpu_task)));
1531 PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1532 gpu_task->ec = parsec_task;
1533 gpu_task->task_type = 0;
1534 gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max();
1535 gpu_task->pushout = 0;
1536 gpu_task->submit = &TT::device_static_submit;
1545 task->dev_ptr->gpu_task = gpu_task;
1548 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1551 task->dev_ptr->task_class = *task->parsec_task.task_class;
1554 static_op(parsec_task);
1558 parsec_task_class_t& tc = task->dev_ptr->task_class;
1561 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1562 tc.in[i] = gpu_task->flow[i];
1563 tc.out[i] = gpu_task->flow[i];
1565 tc.nb_flows = MAX_PARAM_COUNT;
1569 if (tt->devicemap) {
1571 if constexpr (std::is_void_v<keyT>) {
1576 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1578 if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1579 parsec_data_t *
data = parsec_task->data[i].data_in->original;
1583 if (
data->owner_device == 0) {
1584 parsec_advise_data_on_device(
data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1591 task->parsec_task.task_class = &task->dev_ptr->task_class;
1594 return PARSEC_HOOK_RETURN_DONE;
1597 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1600 return PARSEC_HOOK_RETURN_ERROR;
1604 static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1609 task_t *task = (task_t*)parsec_task;
1611 if (
nullptr == task->suspended_task_address) {
1613 return PARSEC_HOOK_RETURN_DONE;
1617 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1620 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1622 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1623 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1625 return PARSEC_HOOK_RETURN_DONE;
1628 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1629 assert(NULL != device);
1631 task->dev_ptr->device = device;
1632 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1633 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1635 switch(device->super.type) {
1637 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1638 case PARSEC_DEV_CUDA:
1642 gpu_task->stage_in = parsec_default_gpu_stage_in;
1643 gpu_task->stage_out = parsec_default_gpu_stage_out;
1644 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1648 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1649 case PARSEC_DEV_HIP:
1651 gpu_task->stage_in = parsec_default_gpu_stage_in;
1652 gpu_task->stage_out = parsec_default_gpu_stage_out;
1653 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1657 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1658 case PARSEC_DEV_LEVEL_ZERO:
1660 gpu_task->stage_in = parsec_default_gpu_stage_in;
1661 gpu_task->stage_out = parsec_default_gpu_stage_out;
1662 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1669 ttg::print_error(task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1671 return PARSEC_HOOK_RETURN_DONE;
1675 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1677 task_t *task = (task_t*)parsec_task;
1678 void* suspended_task_address =
1679 #ifdef TTG_HAVE_COROUTINE
1680 task->suspended_task_address;
1685 if (suspended_task_address ==
nullptr) {
1687 ttT *baseobj = task->tt;
1688 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1691 if (obj->tracing()) {
1692 if constexpr (!ttg::meta::is_void_v<keyT>)
1693 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": executing");
1695 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : executing");
1698 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1699 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1700 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1701 }
else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1702 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1703 }
else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1704 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1705 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1706 }
else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1715 #ifdef TTG_HAVE_COROUTINE
1718 #ifdef TTG_HAVE_DEVICE
1720 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1724 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1725 task->tt->set_outputs_tls_ptr();
1727 if (coro.completed()) {
1729 suspended_task_address =
nullptr;
1731 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1737 assert(ret.ready());
1738 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1739 task->tt->set_outputs_tls_ptr();
1741 if (ret.completed()) {
1743 suspended_task_address =
nullptr;
1751 for (
auto &event_ptr : events) {
1752 event_ptr->finish();
1756 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1758 task->suspended_task_address = suspended_task_address;
1766 #ifdef TTG_HAVE_COROUTINE
1767 task->suspended_task_address = suspended_task_address;
1769 if (suspended_task_address ==
nullptr) {
1770 ttT *baseobj = task->tt;
1771 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1772 if (obj->tracing()) {
1773 if constexpr (!ttg::meta::is_void_v<keyT>)
1774 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": done executing");
1776 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1780 return PARSEC_HOOK_RETURN_DONE;
1783 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1784 task_t *task =
static_cast<task_t*
>(parsec_task);
1786 void* suspended_task_address =
1787 #ifdef TTG_HAVE_COROUTINE
1788 task->suspended_task_address;
1792 if (suspended_task_address ==
nullptr) {
1793 ttT *baseobj = (
ttT *)task->object_ptr;
1794 derivedT *obj = (derivedT *)task->object_ptr;
1797 if constexpr (!ttg::meta::is_void_v<keyT>) {
1798 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1799 }
else if constexpr (ttg::meta::is_void_v<keyT>) {
1806 #ifdef TTG_HAVE_COROUTINE
1808 assert(ret.ready());
1810 if (ret.completed()) {
1812 suspended_task_address =
nullptr;
1821 task->suspended_task_address = suspended_task_address;
1823 if (suspended_task_address) {
1826 return PARSEC_HOOK_RETURN_AGAIN;
1829 return PARSEC_HOOK_RETURN_DONE;
1832 template <std::
size_t i>
1833 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1834 using rtask_t = detail::reducer_task_t;
1835 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1836 constexpr
const bool val_is_void = ttg::meta::is_void_v<value_t>;
1837 constexpr
const bool input_is_const = std::is_const_v<value_t>;
1838 rtask_t *rtask = (rtask_t*)parsec_task;
1839 task_t *parent_task =
static_cast<task_t*
>(rtask->parent_task);
1840 ttT *baseobj = parent_task->tt;
1841 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1843 auto& reducer = std::get<i>(baseobj->input_reducers);
1847 if (obj->tracing()) {
1848 if constexpr (!ttg::meta::is_void_v<keyT>)
1849 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1851 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : reducer executing");
1855 detail::ttg_data_copy_t *target_copy;
1856 target_copy = parent_task->copies[i];
1857 assert(val_is_void ||
nullptr != target_copy);
1860 std::size_t
size = 0;
1861 assert(parent_task->streams[i].reduce_count > 0);
1862 if (rtask->is_first) {
1863 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1865 if (obj->tracing()) {
1866 if constexpr (!ttg::meta::is_void_v<keyT>)
1867 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1869 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : first reducer empty");
1872 return PARSEC_HOOK_RETURN_DONE;
1880 if constexpr(!val_is_void) {
1882 detail::ttg_data_copy_t *source_copy;
1883 parsec_list_item_t *item;
1884 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1885 if (
nullptr == item) {
1889 source_copy = ((detail::ttg_data_copy_self_t *)(item))->
self;
1890 assert(target_copy->num_readers() == target_copy->mutable_tag);
1891 assert(source_copy->num_readers() > 0);
1892 reducer(*
reinterpret_cast<std::decay_t<value_t> *
>(target_copy->get_ptr()),
1893 *
reinterpret_cast<std::decay_t<value_t> *
>(source_copy->get_ptr()));
1895 }
else if constexpr(val_is_void) {
1899 size = ++parent_task->streams[i].size;
1901 }
while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1905 bool complete = (
size >= parent_task->streams[i].goal);
1910 if (complete && c == 0) {
1911 if constexpr(input_is_const) {
1913 target_copy->reset_readers();
1916 parent_task->remove_from_hash =
true;
1917 parent_task->release_task(parent_task);
1922 if (obj->tracing()) {
1923 if constexpr (!ttg::meta::is_void_v<keyT>)
1924 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": done executing");
1926 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1929 return PARSEC_HOOK_RETURN_DONE;
1934 template <
typename T>
1935 uint64_t
unpack(T &obj,
void *_bytes, uint64_t pos) {
1937 uint64_t payload_size;
1938 if constexpr (!dd_t::serialize_size_is_const) {
1941 payload_size = dd_t::payload_size(&obj);
1943 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1947 template <
typename T>
1950 uint64_t payload_size = dd_t::payload_size(&obj);
1951 if constexpr (!dd_t::serialize_size_is_const) {
1954 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1960 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1962 derivedT *obj =
reinterpret_cast<derivedT *
>(bop);
1963 switch (hd->
fn_id) {
1967 assert(hd->
param_id < obj->set_arg_from_msg_fcts.size());
1968 auto member = obj->set_arg_from_msg_fcts[hd->
param_id];
1978 assert(hd->
param_id < obj->set_argstream_size_from_msg_fcts.size());
1979 auto member = obj->set_argstream_size_from_msg_fcts[hd->
param_id];
1985 assert(hd->
param_id < obj->finalize_argstream_from_msg_fcts.size());
1986 auto member = obj->finalize_argstream_from_msg_fcts[hd->
param_id];
1992 assert(hd->
param_id < obj->get_from_pull_msg_fcts.size());
1993 auto member = obj->get_from_pull_msg_fcts[hd->
param_id];
2004 auto &world_impl = world.impl();
2005 parsec_execution_stream_s *es = world_impl.execution_stream();
2006 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
2007 return &mempools.thread_mempools[index];
2010 template <
size_t i,
typename valueT>
2014 parsec_execution_stream_s *es = world.impl().execution_stream();
2016 dummy =
new (parsec_thread_mempool_allocate(mempool))
task_t(mempool, &this->
self,
this);
2024 dummy->
parsec_task.taskpool = world.impl().taskpool();
2031 parsec_task_t *task_ring =
nullptr;
2032 for (
auto &&key : keylist) {
2034 if constexpr (std::is_copy_constructible_v<valueT>) {
2035 set_arg_local_impl<i>(key, *
reinterpret_cast<valueT *
>(copy->
get_ptr()), copy, &task_ring);
2039 static_assert(!std::is_reference_v<valueT>);
2041 set_arg_local_impl<i>(key, std::move(*
reinterpret_cast<valueT *
>(copy->
get_ptr())), copy, &task_ring);
2043 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<valueT>).name() +
" but the type is not copyable");
2048 if (
nullptr != task_ring) {
2049 auto &world_impl = world.impl();
2050 parsec_task_t *vp_task_ring[1] = { task_ring };
2051 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2058 complete_task_and_release(es, &dummy->
parsec_task);
2059 parsec_thread_mempool_free(mempool, &dummy->
parsec_task);
2071 template <std::
size_t i>
2073 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2075 msg_t *msg =
static_cast<msg_t *
>(
data);
2076 if constexpr (!ttg::meta::is_void_v<keyT>) {
2080 uint64_t key_end_pos;
2081 std::vector<keyT> keylist;
2082 int num_keys = msg->tt_id.num_keys;
2083 keylist.reserve(num_keys);
2084 auto rank = world.rank();
2085 for (
int k = 0; k < num_keys; ++k) {
2087 pos =
unpack(key, msg->bytes, pos);
2088 assert(keymap(key) ==
rank);
2089 keylist.push_back(std::move(key));
2095 if constexpr (!ttg::meta::is_void_v<valueT>) {
2096 using decvalueT = std::decay_t<valueT>;
2097 int32_t num_iovecs = msg->tt_id.num_iovecs;
2102 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2105 metadata_t metadata;
2106 pos =
unpack(metadata, msg->bytes, pos);
2115 parsec_gpu_data_copy_t* gpu_elem;
2116 gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2119 while (i < parsec_nb_devices) {
2120 if (
nullptr == gpu_elem) {
2121 gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2122 gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2123 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2124 gpu_elem->version = 0;
2125 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2127 if (
nullptr == gpu_elem->device_private) {
2128 gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2129 if (
nullptr == gpu_elem->device_private) {
2138 pos =
unpack(*
static_cast<decvalueT *
>(copy->
get_ptr()), msg->bytes, pos);
2141 if (num_iovecs == 0) {
2142 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2147 int remote = msg->tt_id.sender;
2148 assert(remote < world.size());
2150 auto &val = *
static_cast<decvalueT *
>(copy->
get_ptr());
2152 bool inline_data = msg->tt_id.inline_data;
2155 parsec_ce_tag_t cbtag;
2157 auto create_activation_fn = [&]() {
2159 std::memcpy(&cbtag, msg->bytes + pos,
sizeof(cbtag));
2160 pos +=
sizeof(cbtag);
2165 set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2166 this->world.impl().decrement_inflight_msg();
2170 auto read_inline_data = [&](
auto&& iovec){
2173 std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes);
2174 pos += iovec.num_bytes;
2176 auto handle_iovec_fn = [&](
auto&& iovec,
auto activation) {
2177 using ActivationT = std::decay_t<decltype(*activation)>;
2180 parsec_ce_mem_reg_handle_t rreg;
2181 int32_t rreg_size_i;
2182 std::memcpy(&rreg_size_i, msg->bytes + pos,
sizeof(rreg_size_i));
2183 pos +=
sizeof(rreg_size_i);
2184 rreg =
static_cast<parsec_ce_mem_reg_handle_t
>(msg->bytes + pos);
2188 std::intptr_t fn_ptr;
2189 std::memcpy(&fn_ptr, msg->bytes + pos,
sizeof(fn_ptr));
2190 pos +=
sizeof(fn_ptr);
2193 parsec_ce_mem_reg_handle_t lreg;
2195 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2196 iovec.num_bytes, &lreg, &lreg_size);
2197 world.impl().increment_inflight_msg();
2200 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote,
2201 &detail::get_complete_cb<ActivationT>, activation,
2203 cbtag, &fn_ptr,
sizeof(std::intptr_t));
2208 for (
auto&& iov : descr.get_data(val)) {
2209 read_inline_data(iov);
2212 auto activation = create_activation_fn();
2213 for (
auto&& iov : descr.get_data(val)) {
2214 handle_iovec_fn(iov, activation);
2223 auto activation = create_activation_fn();
2225 handle_iovec_fn(
ttg::iovec{
data->nb_elts,
data->device_copies[
data->owner_device]->device_private}, activation);
2230 assert(num_iovecs == nv);
2234 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2238 }
else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2239 for (
auto &&key : keylist) {
2240 set_arg<i, keyT, ttg::Void>(key,
ttg::Void{});
2244 }
else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2245 using decvalueT = std::decay_t<valueT>;
2248 unpack(val, msg->bytes, 0);
2249 set_arg<i, keyT, valueT>(std::move(val));
2251 }
else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2252 set_arg<i, keyT, ttg::Void>(
ttg::Void{});
2258 template <std::
size_t i>
2261 msg_t *msg =
static_cast<msg_t *
>(
data);
2262 if constexpr (!ttg::meta::is_void_v<keyT>) {
2265 auto rank = world.rank();
2267 pos =
unpack(key, msg->bytes, pos);
2268 assert(keymap(key) ==
rank);
2269 finalize_argstream<i>(key);
2271 auto rank = world.rank();
2272 assert(keymap() ==
rank);
2273 finalize_argstream<i>();
2277 template <std::
size_t i>
2280 auto msg =
static_cast<msg_t *
>(
data);
2282 if constexpr (!ttg::meta::is_void_v<keyT>) {
2284 auto rank = world.rank();
2286 pos =
unpack(key, msg->bytes, pos);
2287 assert(keymap(key) ==
rank);
2288 std::size_t argstream_size;
2289 pos =
unpack(argstream_size, msg->bytes, pos);
2290 set_argstream_size<i>(key, argstream_size);
2292 auto rank = world.rank();
2293 assert(keymap() ==
rank);
2294 std::size_t argstream_size;
2295 pos =
unpack(argstream_size, msg->bytes, pos);
2296 set_argstream_size<i>(argstream_size);
2300 template <std::
size_t i>
2303 msg_t *msg =
static_cast<msg_t *
>(
data);
2304 auto &
in = std::get<i>(input_terminals);
2305 if constexpr (!ttg::meta::is_void_v<keyT>) {
2309 pos =
unpack(key, msg->bytes, pos);
2310 set_arg<i>(key, (
in.container).get(key));
2314 template <std::
size_t i,
typename Key,
typename Value>
2315 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2316 const Key &key, Value &&value) {
2317 set_arg_local_impl<i>(key, std::forward<Value>(value));
2320 template <std::
size_t i,
typename Key = keyT,
typename Value>
2321 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2323 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2326 template <std::
size_t i,
typename Key,
typename Value>
2327 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2328 const Key &key,
const Value &value) {
2329 set_arg_local_impl<i>(key, value);
2332 template <std::
size_t i,
typename Key = keyT,
typename Value>
2333 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2334 const Value &value) {
2335 set_arg_local_impl<i>(
ttg::Void{}, value);
2338 template <std::
size_t i,
typename Key = keyT,
typename Value>
2339 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2340 std::shared_ptr<const Value> &valueptr) {
2341 set_arg_local_impl<i>(
ttg::Void{}, *valueptr);
2344 template <
typename Key>
2346 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2347 auto &world_impl = world.impl();
2350 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2351 int32_t priority = 0;
2352 if constexpr (!keyT_is_Void) {
2353 priority = priomap(key);
2355 newtask =
new (taskobj)
task_t(key, mempool, &this->
self, world_impl.taskpool(),
this, priority);
2357 priority = priomap();
2359 newtask =
new (taskobj)
task_t(mempool, &this->
self, world_impl.taskpool(),
this, priority);
2362 for (
int i = 0; i < static_stream_goal.size(); ++i) {
2363 newtask->
streams[i].goal = static_stream_goal[i];
2371 template <std::
size_t i>
2375 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2376 auto &world_impl = world.impl();
2379 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2381 int32_t priority = 0;
2382 if constexpr (!keyT_is_Void) {
2383 priority = priomap(task->
key);
2386 priority = priomap();
2391 world_impl.taskpool(), priority, is_first);
2398 template <std::
size_t i,
typename Key,
typename Value>
2400 parsec_task_t **task_ring =
nullptr) {
2401 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2402 constexpr
const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2403 constexpr
const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2404 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2407 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ", i);
2409 parsec_key_t hk = 0;
2410 if constexpr (!keyT_is_Void) {
2411 hk =
reinterpret_cast<parsec_key_t
>(&key);
2412 assert(keymap(key) == world.rank());
2416 auto &world_impl = world.impl();
2417 auto &reducer = std::get<i>(input_reducers);
2419 bool remove_from_hash =
true;
2420 #if defined(PARSEC_PROF_GRAPHER)
2421 bool discover_task =
true;
2423 bool get_pull_data =
false;
2424 bool has_lock =
false;
2426 if (numins > 1 || reducer) {
2429 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
2431 world_impl.increment_created();
2434 if( world_impl.dag_profiling() ) {
2435 #if defined(PARSEC_PROF_GRAPHER)
2436 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2440 }
else if (!reducer && numins == (task->
in_data_count + 1)) {
2442 parsec_hash_table_nolock_remove(&
tasks_table, hk);
2443 remove_from_hash =
false;
2447 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2452 world_impl.increment_created();
2453 remove_from_hash =
false;
2454 if( world_impl.dag_profiling() ) {
2455 #if defined(PARSEC_PROF_GRAPHER)
2456 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2462 if( world_impl.dag_profiling() ) {
2463 #if defined(PARSEC_PROF_GRAPHER)
2468 if(orig_index >= 0) {
2469 snprintf(orig_str, 32,
"%d", orig_index);
2471 strncpy(orig_str,
"_", 32);
2473 snprintf(dest_str, 32,
"%lu", i);
2474 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2475 .flow_index = 0, .flow_datatype_mask = ~0 };
2476 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2477 .flow_index = 0, .flow_datatype_mask = ~0 };
2488 if (
nullptr != copy) {
2490 copy = detail::register_data_copy<valueT>(copy, task,
is_const);
2501 if (reducer && 1 != task->
streams[i].goal) {
2502 auto submit_reducer_task = [&](
auto *parent_task){
2504 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2509 reduce_task = create_new_reducer_task<i>(parent_task,
false);
2514 if constexpr (!ttg::meta::is_void_v<valueT>) {
2517 if (
nullptr == (copy = task->
copies[i])) {
2518 using decay_valueT = std::decay_t<valueT>;
2523 reduce_task = create_new_reducer_task<i>(task,
true);
2527 task->
streams[i].reduce_count.store(1, std::memory_order_relaxed);
2543 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2546 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2552 parsec_lifo_push(&task->
streams[i].reduce_copies, ©->
super);
2553 submit_reducer_task(task);
2557 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2559 submit_reducer_task(task);
2569 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2572 if constexpr (!valueT_is_Void) {
2573 if (
nullptr != task->
copies[i]) {
2575 throw std::logic_error(
"bad set arg");
2595 if constexpr (!ttg::meta::is_void_v<keyT>) {
2596 if (get_pull_data) {
2603 bool constrained =
false;
2604 if (constraints_check.size() > 0) {
2605 if constexpr (ttg::meta::is_void_v<keyT>) {
2606 constrained = !constraints_check[0]();
2608 constrained = !constraints_check[0](task->
key);
2615 return !constrained;
2618 template<
typename Key = keyT>
2621 assert(cid < constraints_check.size());
2623 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2624 if (!constraints_check[i]()) {
2632 parsec_key_t hk = 0;
2634 assert(task !=
nullptr);
2635 auto &world_impl = world.impl();
2636 parsec_execution_stream_t *es = world_impl.execution_stream();
2637 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2638 __parsec_schedule_vp(es, vp_task_rings, 0);
2642 template<
typename Key = keyT>
2643 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
release_constraint(std::size_t cid,
const std::span<Key>& keys) {
2644 assert(cid < constraints_check.size());
2645 parsec_task_t *task_ring =
nullptr;
2646 for (
auto& key : keys) {
2649 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2650 if (!constraints_check[i](key)) {
2658 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
2660 assert(task !=
nullptr);
2661 if (task_ring ==
nullptr) {
2666 parsec_list_item_ring_push_sorted(&task_ring->super, &task->
parsec_task.super,
2667 offsetof(parsec_task_t, priority));
2671 if (
nullptr != task_ring) {
2672 auto &world_impl = world.impl();
2673 parsec_execution_stream_t *es = world_impl.execution_stream();
2674 parsec_task_t *vp_task_rings[1] = { task_ring };
2675 __parsec_schedule_vp(es, vp_task_rings, 0);
2680 parsec_task_t **task_ring =
nullptr) {
2681 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2690 count = parsec_atomic_fetch_inc_int32(&task->
in_data_count) + 1;
2691 assert(count <=
self.dependencies_goal);
2694 auto &world_impl = world.impl();
2695 ttT *baseobj = task->
tt;
2697 if (count == numins) {
2698 parsec_execution_stream_t *es = world_impl.execution_stream();
2699 parsec_key_t hk = task->
pkey();
2701 if constexpr (!keyT_is_Void) {
2710 if (
nullptr == task_ring) {
2711 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2712 __parsec_schedule_vp(es, vp_task_rings, 0);
2713 }
else if (*task_ring ==
nullptr) {
2718 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->
parsec_task.super,
2719 offsetof(parsec_task_t, priority));
2722 }
else if constexpr (!ttg::meta::is_void_v<keyT>) {
2723 if ((baseobj->num_pullins + count == numins) && baseobj->
is_lazy_pull()) {
2731 template <std::
size_t i,
typename Key,
typename Value>
2732 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2734 set_arg_impl<i>(key, std::forward<Value>(value));
2738 template <std::
size_t i,
typename Key,
typename Value>
2739 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(Value &&value) {
2740 set_arg_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2743 template <std::
size_t i,
typename Key = keyT>
2744 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2749 template <std::
size_t i,
typename Key>
2750 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2754 template<
typename Value,
typename Key>
2761 using decvalueT = std::decay_t<Value>;
2762 bool inline_data =
false;
2764 std::size_t iov_size = 0;
2765 std::size_t metadata_size = 0;
2768 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2769 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2770 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2771 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2780 std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2788 template <std::
size_t i,
typename Key,
typename Value>
2791 using decvalueT = std::decay_t<Value>;
2792 using norefvalueT = std::remove_reference_t<Value>;
2793 norefvalueT *value_ptr = &value;
2795 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2796 if(world.impl().profiling()) {
2797 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2801 if constexpr (!ttg::meta::is_void_v<Key>)
2802 owner = keymap(key);
2805 if (owner == world.rank()) {
2806 if constexpr (!ttg::meta::is_void_v<keyT>)
2807 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2809 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value), copy_in);
2810 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2811 if(world.impl().profiling()) {
2812 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2821 auto &world_impl = world.impl();
2824 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2827 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2831 if (
nullptr == copy) {
2833 if (
nullptr == copy) {
2837 value_ptr =
static_cast<norefvalueT*
>(copy->
get_ptr());
2842 msg->tt_id.inline_data = inline_data;
2844 auto write_header_fn = [&]() {
2849 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2850 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2851 pos +=
sizeof(cbtag);
2854 auto handle_iovec_fn = [&](
auto&& iovec){
2858 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
2859 pos += iovec.num_bytes;
2866 copy = detail::register_data_copy<decvalueT>(copy,
nullptr,
true);
2867 parsec_ce_mem_reg_handle_t lreg;
2870 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2871 iovec.num_bytes, &lreg, &lreg_size);
2872 auto lreg_ptr = std::shared_ptr<void>{lreg, [](
void *
ptr) {
2873 parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)
ptr;
2874 parsec_ce.mem_unregister(&memreg);
2876 int32_t lreg_size_i = lreg_size;
2877 std::memcpy(msg->bytes + pos, &lreg_size_i,
sizeof(lreg_size_i));
2878 pos +=
sizeof(lreg_size_i);
2879 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2883 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2889 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2890 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
2891 pos +=
sizeof(fn_ptr);
2897 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2898 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2900 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2901 pos =
pack(metadata, msg->bytes, pos);
2904 for (
auto&& iov : iovs) {
2905 handle_iovec_fn(iov);
2909 pos =
pack(*value_ptr, msg->bytes, pos, copy);
2919 msg->tt_id.num_iovecs = num_iovecs;
2923 msg->tt_id.num_keys = 0;
2924 msg->tt_id.key_offset = pos;
2925 if constexpr (!ttg::meta::is_void_v<Key>) {
2926 size_t tmppos =
pack(key, msg->bytes, pos);
2928 msg->tt_id.num_keys = 1;
2931 parsec_taskpool_t *tp = world_impl.taskpool();
2932 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2933 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2935 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
2937 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2938 if(world.impl().profiling()) {
2939 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2942 #if defined(PARSEC_PROF_GRAPHER)
2947 if(orig_index >= 0) {
2948 snprintf(orig_str, 32,
"%d", orig_index);
2950 strncpy(orig_str,
"_", 32);
2952 snprintf(dest_str, 32,
"%lu", i);
2953 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2954 .flow_index = 0, .flow_datatype_mask = ~0 };
2955 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2956 .flow_index = 0, .flow_datatype_mask = ~0 };
2964 template <
int i,
typename Iterator,
typename Value>
2966 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2967 if(world.impl().profiling()) {
2968 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2971 parsec_task_t *task_ring =
nullptr;
2977 for (
auto it = begin; it != end; ++it) {
2978 set_arg_local_impl<i>(*it, value, copy, &task_ring);
2981 if (
nullptr != task_ring) {
2982 parsec_task_t *vp_task_ring[1] = { task_ring };
2983 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2985 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2986 if(world.impl().profiling()) {
2987 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2992 template <std::
size_t i,
typename Key,
typename Value>
2993 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2996 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2998 auto np = world.size();
2999 int rank = world.rank();
3001 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3002 [&](
const Key &key) { return keymap(key) != rank; });
3005 using decvalueT = std::decay_t<Value>;
3008 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
3009 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](
const Key &a,
const Key &b)
mutable {
3010 int rank_a = keymap(a);
3011 int rank_b = keymap(b);
3013 int pos_a = (rank_a + np - rank) % np;
3014 int pos_b = (rank_b + np - rank) % np;
3015 return pos_a < pos_b;
3019 auto local_begin = keylist_sorted.end();
3020 auto local_end = keylist_sorted.end();
3022 int32_t num_iovs = 0;
3026 assert(
nullptr != copy);
3029 auto &world_impl = world.impl();
3030 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3035 bool inline_data =
can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3036 msg->tt_id.inline_data = inline_data;
3038 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3039 auto write_iov_header = [&](){
3044 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
3045 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
3046 pos +=
sizeof(cbtag);
3049 auto handle_iov_fn = [&](
auto&& iovec){
3052 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
3053 pos += iovec.num_bytes;
3055 parsec_ce_mem_reg_handle_t lreg;
3057 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
3058 iovec.num_bytes, &lreg, &lreg_size);
3060 memregs.push_back(std::make_pair(
static_cast<int32_t
>(lreg_size),
3062 std::shared_ptr<void>{lreg, [](
void *
ptr) {
3063 parsec_ce_mem_reg_handle_t memreg =
3064 (parsec_ce_mem_reg_handle_t)
ptr;
3066 parsec_ce.mem_unregister(&memreg);
3075 auto metadata = descr.get_metadata(value);
3076 pos =
pack(metadata, msg->bytes, pos);
3077 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(&value));
3078 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3079 memregs.reserve(num_iovs);
3081 for (
auto &&iov : iovs) {
3087 pos =
pack(value, msg->bytes, pos, copy);
3089 memregs.reserve(num_iovs);
3093 data->device_copies[
data->owner_device]->device_private});
3097 msg->tt_id.num_iovecs = num_iovs;
3099 std::size_t save_pos = pos;
3101 parsec_taskpool_t *tp = world_impl.taskpool();
3102 for (
auto it = keylist_sorted.begin(); it < keylist_sorted.end(); ) {
3104 auto owner = keymap(*it);
3105 if (owner ==
rank) {
3109 std::find_if_not(++it, keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3122 for (
int idx = 0; idx < num_iovs; ++idx) {
3125 std::shared_ptr<void> lreg_ptr;
3126 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3127 std::memcpy(msg->bytes + pos, &lreg_size,
sizeof(lreg_size));
3128 pos +=
sizeof(lreg_size);
3129 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3133 copy = detail::register_data_copy<valueT>(copy,
nullptr,
true);
3135 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3141 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3142 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
3143 pos +=
sizeof(fn_ptr);
3148 msg->tt_id.key_offset = pos;
3154 pos =
pack(*it, msg->bytes, pos);
3156 }
while (it < keylist_sorted.end() && keymap(*it) == owner);
3157 msg->tt_id.num_keys = num_keys;
3159 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3160 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3162 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3166 broadcast_arg_local<i>(local_begin, local_end, value);
3169 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3176 template <
typename Key,
typename... Ts,
size_t... Is,
size_t... Js>
3177 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3178 std::index_sequence<Js...>,
const Key &key,
3179 const std::tuple<Ts...> &args) {
3180 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3181 constexpr
size_t js[] = {Js...};
3182 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3188 template <
typename Key,
typename... Ts,
size_t... Is>
3189 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
const Key &key,
3190 const std::tuple<Ts...> &args) {
3191 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3197 template <
typename Key = keyT,
typename... Ts,
size_t... Is,
size_t... Js>
3198 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3199 const std::tuple<Ts...> &args) {
3200 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3201 constexpr
size_t js[] = {Js...};
3202 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3208 template <
typename Key = keyT,
typename... Ts,
size_t... Is>
3209 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
3210 const std::tuple<Ts...> &args) {
3211 set_args(std::index_sequence_for<Ts...>{}, is, args);
3217 template <std::
size_t i>
3219 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3220 assert(
size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3222 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ", i);
3225 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3227 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3230 static_stream_goal[i] =
size;
3236 template <std::
size_t i,
typename Key>
3239 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3240 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3243 const auto owner = keymap(key);
3244 if (owner != world.rank()) {
3245 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ", i);
3247 auto &world_impl = world.impl();
3249 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3251 world_impl.rank(), 1);
3253 pos =
pack(key, msg->bytes, pos);
3255 parsec_taskpool_t *tp = world_impl.taskpool();
3256 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3257 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3258 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3261 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ",
size,
" for terminal ", i);
3263 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3266 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3268 world.impl().increment_created();
3270 if( world.impl().dag_profiling() ) {
3271 #if defined(PARSEC_PROF_GRAPHER)
3272 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3276 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3286 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3288 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3297 template <std::
size_t i,
typename Key = keyT>
3300 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3301 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3304 const auto owner = keymap();
3305 if (owner != world.rank()) {
3306 ttg::trace(world.rank(),
":",
get_name(),
" : forwarding stream size for terminal ", i);
3308 auto &world_impl = world.impl();
3310 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3312 world_impl.rank(), 0);
3314 parsec_taskpool_t *tp = world_impl.taskpool();
3315 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3316 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3317 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3322 parsec_key_t hk = 0;
3325 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3327 world.impl().increment_created();
3329 if( world.impl().dag_profiling() ) {
3330 #if defined(PARSEC_PROF_GRAPHER)
3331 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3335 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3345 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3347 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3355 template <std::
size_t i,
typename Key>
3358 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3361 const auto owner = keymap(key);
3362 if (owner != world.rank()) {
3363 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ", i);
3365 auto &world_impl = world.impl();
3367 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3369 world_impl.rank(), 1);
3371 pos =
pack(key, msg->bytes, pos);
3372 parsec_taskpool_t *tp = world_impl.taskpool();
3373 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3374 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3375 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3378 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ", i);
3380 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3385 " : error finalize called on stream that never received an input data: ", i);
3386 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3397 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3399 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3400 if (1 == c && (task->
streams[i].size >= 1)) {
3407 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3410 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3413 const auto owner = keymap();
3414 if (owner != world.rank()) {
3415 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ", i);
3417 auto &world_impl = world.impl();
3419 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3421 world_impl.rank(), 0);
3422 parsec_taskpool_t *tp = world_impl.taskpool();
3423 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3424 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3425 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3430 auto hk =
static_cast<parsec_key_t
>(0);
3434 " : error finalize called on stream that never received an input data: ", i);
3435 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3446 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3448 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3449 if (1 == c && (task->
streams[i].size >= 1)) {
3455 template<
typename Value>
3460 auto check_parsec_data = [&](parsec_data_t*
data) {
3461 if (
data->owner_device != 0) {
3464 while (flowidx < MAX_PARAM_COUNT &&
3465 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3472 if (flowidx == MAX_PARAM_COUNT) {
3473 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3475 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3478 gpu_task->flow_nb_elts[flowidx] =
data->nb_elts;
3481 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3482 gpu_task->pushout |= 1<<flowidx;
3486 [&](parsec_data_t*
data){
3487 check_parsec_data(
data);
3493 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3494 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3497 constexpr
const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3504 if (
nullptr == copy) {
3509 bool need_pushout =
false;
3517 auto &reducer = std::get<i>(input_reducers);
3525 if constexpr (value_is_const) {
3544 need_pushout =
true;
3551 need_pushout =
true;
3555 need_pushout =
true;
3562 need_pushout =
true;
3566 if (!need_pushout) {
3567 bool device_supported =
false;
3577 if (!device_supported) {
3578 need_pushout = remote_check();
3589 template <std::
size_t i,
typename Key,
typename Value>
3590 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3593 auto remote_check = [&](){
3595 int rank = world.rank();
3596 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3597 [&](
const Key &key) { return keymap(key) != rank; });
3600 do_prepare_send<i>(value, remote_check);
3603 template <std::
size_t i,
typename Key,
typename Value>
3604 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3607 auto remote_check = [&](){
3609 int rank = world.rank();
3610 return (keymap() !=
rank);
3612 do_prepare_send<i>(value, remote_check);
3625 TT(
const TT &other) =
delete;
3626 TT &operator=(
const TT &other) =
delete;
3627 TT(
TT &&other) =
delete;
3628 TT &operator=(
TT &&other) =
delete;
3631 template <
typename terminalT, std::
size_t i>
3632 void register_input_callback(terminalT &input) {
3633 using valueT = std::decay_t<typename terminalT::value_type>;
3634 if (input.is_pull_terminal) {
3640 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3641 auto move_callback = [
this](
const keyT &key, valueT &&value) {
3642 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3644 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3645 set_arg<i, keyT, const valueT &>(key, value);
3647 auto broadcast_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3648 broadcast_arg<i, keyT, valueT>(keylist, value);
3650 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3651 prepare_send<i, keyT, valueT>(keylist, value);
3653 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3654 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3655 input.set_callback(send_callback, move_callback, broadcast_callback,
3656 setsize_callback, finalize_callback, prepare_send_callback);
3661 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3662 auto send_callback = [
this](
const keyT &key) { set_arg<i, keyT, ttg::Void>(key,
ttg::Void{}); };
3663 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3664 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3665 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3674 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3675 auto move_callback = [
this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3676 auto send_callback = [
this](
const valueT &value) {
3677 if constexpr (std::is_copy_constructible_v<valueT>) {
3678 set_arg<i, keyT, const valueT &>(value);
3681 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") +
typeid(std::decay_t<valueT>).name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3684 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3685 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3686 auto prepare_send_callback = [
this](
const valueT &value) {
3687 prepare_send<i, void>(value);
3689 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3694 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3695 auto send_callback = [
this]() { set_arg<i, keyT, ttg::Void>(
ttg::Void{}); };
3696 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3697 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3698 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3708 template <std::size_t... IS>
3709 void register_input_callbacks(std::index_sequence<IS...>) {
3712 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3717 template <std::size_t... IS,
typename inedgesT>
3718 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3719 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3723 template <std::size_t... IS,
typename outedgesT>
3724 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3725 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3730 template <
typename input_terminals_tupleT, std::size_t... IS,
typename flowsT>
3731 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3733 (*(
const_cast<std::remove_const_t<decltype(flows[IS]-
>flow_flags)> *>(&(flows[IS]->flow_flags))) =
3734 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3735 : PARSEC_FLOW_ACCESS_RW),
3740 template <
typename input_terminals_tupleT,
typename flowsT>
3741 void initialize_flows(flowsT &&flows) {
3742 _initialize_flows<input_terminals_tupleT>(
3749 static int key_equal(parsec_key_t a, parsec_key_t b,
void *user_data) {
3750 if constexpr (std::is_same_v<keyT, void>) {
3753 keyT &ka = *(
reinterpret_cast<keyT *
>(a));
3754 keyT &kb = *(
reinterpret_cast<keyT *
>(b));
3759 static uint64_t key_hash(parsec_key_t k,
void *user_data) {
3760 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3761 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3764 keyT &kk = *(
reinterpret_cast<keyT *
>(k));
3766 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3771 static char *key_print(
char *buffer,
size_t buffer_size, parsec_key_t k,
void *user_data) {
3772 if constexpr (std::is_same_v<keyT, void>) {
3776 keyT kk = *(
reinterpret_cast<keyT *
>(k));
3777 std::stringstream iss;
3779 memset(buffer, 0, buffer_size);
3780 iss.get(buffer, buffer_size);
3785 static parsec_key_t make_key(
const parsec_taskpool_t *tp,
const parsec_assignment_t *as) {
3787 keyT *key = *(keyT**)&(as[2]);
3788 return reinterpret_cast<parsec_key_t
>(key);
3791 static char *parsec_ttg_task_snprintf(
char *buffer,
size_t buffer_size,
const parsec_task_t *parsec_task) {
3792 if(buffer_size == 0)
3795 if constexpr (ttg::meta::is_void_v<keyT>) {
3796 snprintf(buffer, buffer_size,
"%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3798 const task_t *task =
reinterpret_cast<const task_t*
>(parsec_task);
3799 std::stringstream ss;
3802 std::string keystr = ss.str();
3803 std::replace(keystr.begin(), keystr.end(),
'(',
':');
3804 std::replace(keystr.begin(), keystr.end(),
')',
':');
3806 snprintf(buffer, buffer_size,
"%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3811 #if defined(PARSEC_PROF_TRACE)
3812 static void *parsec_ttg_task_info(
void *dst,
const void *
data,
size_t size)
3814 const task_t *task =
reinterpret_cast<const task_t *
>(
data);
3816 if constexpr (ttg::meta::is_void_v<keyT>) {
3817 snprintf(
reinterpret_cast<char*
>(dst),
size,
"()");
3819 std::stringstream ss;
3821 snprintf(
reinterpret_cast<char*
>(dst),
size,
"%s", ss.str().c_str());
3827 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3829 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3833 task_t *task = (task_t*)parsec_task;
3835 #ifdef TTG_HAVE_COROUTINE
3837 if (task->suspended_task_address) {
3839 #ifdef TTG_HAVE_DEVICE
3842 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3845 auto dev_data = dev_task.promise();
3847 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3848 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3851 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3854 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3855 task->tt->set_outputs_tls_ptr();
3856 dev_data.do_sends();
3857 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3864 task->suspended_task_address =
nullptr;
3869 for (
int i = 0; i < task->data_count; i++) {
3870 detail::ttg_data_copy_t *copy = task->
copies[i];
3871 if (
nullptr == copy)
continue;
3873 task->copies[i] =
nullptr;
3876 for (
auto& c : task->tt->constraints_complete) {
3877 if constexpr(std::is_void_v<keyT>) {
3883 return PARSEC_HOOK_RETURN_DONE;
3887 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3888 typename priomapT = ttg::detail::default_priomap<keyT>>
3889 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3890 ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3891 :
ttg::
TTBase(name, numinedges, numouts)
3894 , keymap(std::is_same<keymapT,
ttg::detail::default_keymap<keyT>>::value
3895 ? decltype(keymap)(
ttg::detail::default_keymap<keyT>(world))
3896 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3897 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3899 if (innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3900 if (outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3902 auto &world_impl = world.
impl();
3903 world_impl.register_op(
this);
3905 if constexpr (numinedges == numins) {
3913 register_input_callbacks(std::make_index_sequence<numinedges>{});
3916 memset(&
self, 0,
sizeof(parsec_task_class_t));
3918 self.name = strdup(
get_name().c_str());
3920 self.nb_parameters = 0;
3923 self.nb_flows = MAX_PARAM_COUNT;
3926 if( world_impl.profiling() ) {
3928 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3930 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3941 self.make_key = make_key;
3942 self.key_functions = &tasks_hash_fcts;
3943 self.task_snprintf = parsec_ttg_task_snprintf;
3945 #if defined(PARSEC_PROF_TRACE)
3946 self.profile_info = &parsec_ttg_task_info;
3949 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
3954 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3955 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
3956 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3957 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3958 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3959 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3960 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3962 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3963 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_HIP;
3964 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3965 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_hip<TT>;
3967 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3968 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3969 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3970 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3972 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3973 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3974 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3975 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3977 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3978 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3979 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3982 self.incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
3983 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CPU;
3984 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
3985 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook<TT>;
3986 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3987 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3988 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3992 self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3993 self.complete_execution = complete_task_and_release;
3995 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3996 parsec_flow_t *flow =
new parsec_flow_t;
3997 flow->name = strdup((std::string(
"flow in") + std::to_string(i)).c_str());
3998 flow->sym_type = PARSEC_SYM_INOUT;
4001 flow->dep_in[0] = NULL;
4002 flow->dep_out[0] = NULL;
4003 flow->flow_index = i;
4004 flow->flow_datatype_mask = ~0;
4005 *((parsec_flow_t **)&(
self.
in[i])) = flow;
4010 for (i = 0; i < MAX_PARAM_COUNT; i++) {
4011 parsec_flow_t *flow =
new parsec_flow_t;
4012 flow->name = strdup((std::string(
"flow out") + std::to_string(i)).c_str());
4013 flow->sym_type = PARSEC_SYM_INOUT;
4014 flow->flow_flags = PARSEC_FLOW_ACCESS_READ;
4015 flow->dep_in[0] = NULL;
4016 flow->dep_out[0] = NULL;
4017 flow->flow_index = i;
4018 flow->flow_datatype_mask = (1 << i);
4019 *((parsec_flow_t **)&(
self.
out[i])) = flow;
4024 self.dependencies_goal = numins;
4027 auto *context = world_impl.context();
4028 for (
int i = 0; i < context->nb_vp; i++) {
4029 nbthreads += context->virtual_processes[i]->nb_cores;
4032 parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t),
sizeof(
task_t),
4033 offsetof(parsec_task_t, mempool_owner), nbthreads);
4042 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4043 typename priomapT = ttg::detail::default_priomap<keyT>>
4044 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4047 std::forward<priomapT>(priomap)) {}
4049 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4050 typename priomapT = ttg::detail::default_priomap<keyT>>
4052 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
ttg::World world,
4053 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4054 :
TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4055 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4056 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4058 if constexpr (numinedges > 0) {
4059 register_input_callbacks(std::make_index_sequence<numinedges>{});
4062 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4063 typename priomapT = ttg::detail::default_priomap<keyT>>
4065 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4068 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4072 if(
nullptr !=
self.name ) {
4073 free((
void*)
self.name);
4074 self.name =
nullptr;
4077 for (std::size_t i = 0; i < numins; ++i) {
4078 if (inpute_reducers_taskclass[i] !=
nullptr) {
4079 std::free(inpute_reducers_taskclass[i]);
4080 inpute_reducers_taskclass[i] =
nullptr;
4088 ttT *op = (
ttT *)cb_data;
4089 if constexpr (!ttg::meta::is_void_v<keyT>) {
4090 std::cout <<
"Left over task " << op->
get_name() <<
" " << task->
key << std::endl;
4092 std::cout <<
"Left over task " << op->
get_name() << std::endl;
4110 parsec_mempool_destruct(&mempools);
4113 free((__parsec_chore_t *)
self.incarnations);
4114 for (
int i = 0; i < MAX_PARAM_COUNT; i++) {
4115 if (NULL !=
self.
in[i]) {
4116 free(
self.
in[i]->name);
4118 self.in[i] =
nullptr;
4120 if (NULL !=
self.
out[i]) {
4121 free(
self.
out[i]->name);
4123 self.out[i] =
nullptr;
4126 world.
impl().deregister_op(
this);
4136 template <std::
size_t i,
typename Reducer>
4139 std::get<i>(input_reducers) = reducer;
4141 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4142 if (
nullptr == tc) {
4143 tc = (parsec_task_class_t *)std::calloc(1,
sizeof(*tc));
4144 inpute_reducers_taskclass[i] = tc;
4146 tc->name = strdup((
get_name() + std::string(
" reducer ") + std::to_string(i)).c_str());
4148 tc->nb_parameters = 0;
4150 tc->nb_flows = numflows;
4152 auto &world_impl = world.
impl();
4154 if( world_impl.profiling() ) {
4156 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4158 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4169 tc->make_key = make_key;
4170 tc->key_functions = &tasks_hash_fcts;
4171 tc->task_snprintf = parsec_ttg_task_snprintf;
4173 #if defined(PARSEC_PROF_TRACE)
4174 tc->profile_info = &parsec_ttg_task_info;
4177 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
4182 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
4183 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
4184 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
4186 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_CPU;
4187 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
4188 ((__parsec_chore_t *)
self.incarnations)[1].hook =
detail::hook;
4189 ((__parsec_chore_t *)
self.incarnations)[2].type = PARSEC_DEV_NONE;
4190 ((__parsec_chore_t *)
self.incarnations)[2].evaluate = NULL;
4191 ((__parsec_chore_t *)
self.incarnations)[2].hook = NULL;
4195 tc->incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
4196 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4197 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4198 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4199 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4200 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4201 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4205 tc->release_task = &parsec_release_task_to_mempool;
4206 tc->complete_execution = NULL;
4217 template <std::
size_t i,
typename Reducer>
4219 set_input_reducer<i>(std::forward<Reducer>(reducer));
4220 set_static_argstream_size<i>(
size);
4225 template <std::
size_t i>
4226 std::tuple_element_t<i, input_terminals_type> *
in() {
4227 return &std::get<i>(input_terminals);
4232 template <std::
size_t i>
4233 std::tuple_element_t<i, output_terminalsT> *
out() {
4234 return &std::get<i>(output_terminals);
4238 template <
typename Key = keyT>
4239 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4242 if constexpr(!std::is_same_v<Key, key_type>) {
4247 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4249 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4250 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4255 template <
typename Key = keyT>
4256 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4260 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4262 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4263 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4267 template <
typename Key = keyT>
4268 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4272 if constexpr(!std::is_same_v<Key, key_type>) {
4277 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4278 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4283 template <
typename Key = keyT>
4284 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke() {
4287 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4288 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4293 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4300 template<
typename Key,
typename Arg,
typename... Args, std::size_t I, std::size_t... Is>
4301 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key, Arg&& arg, Args&&... args) {
4302 using arg_type = std::decay_t<Arg>;
4303 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4308 copy->reset_readers();
4310 set_arg_impl<I>(key, val, copy);
4312 if constexpr (std::is_rvalue_reference_v<Arg>) {
4316 }
else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4317 set_arg<I>(key, std::forward<Arg>(arg));
4319 if constexpr (
sizeof...(Is) > 0) {
4321 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4327 template <
typename Key = keyT,
typename Arg,
typename... Args>
4328 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4329 const Key &key, Arg&& arg, Args&&... args) {
4330 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4331 "Number of arguments to invoke must match the number of task inputs.");
4334 invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4335 std::forward<Arg>(arg), std::forward<Args>(args)...);
4338 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4339 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4343 m_defer_writer = value;
4347 return m_defer_writer;
4352 world.
impl().register_tt_profiling(
this);
4362 template <
typename Keymap>
4373 template <
typename Priomap>
4375 priomap = std::forward<Priomap>(pm);
4383 template<
typename Devicemap>
4388 devicemap = std::forward<Devicemap>(dm);
4391 devicemap = [=](
const keyT& key) {
4399 throw std::runtime_error(
"Unknown device type!");
4412 template<
typename Constra
int>
4414 std::size_t cid = constraints_check.size();
4415 if constexpr(ttg::meta::is_void_v<keyT>) {
4417 constraints_check.push_back([c,
this](){
return c->check(
this); });
4418 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(
this);
return true; });
4420 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4421 constraints_check.push_back([c,
this](
const keyT& key){
return c->check(key,
this); });
4422 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(key,
this);
return true; });
4428 template<
typename Constra
int>
4431 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4437 template<
typename Constra
int,
typename Mapper>
4439 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4440 std::size_t cid = constraints_check.size();
4441 if constexpr(ttg::meta::is_void_v<keyT>) {
4443 constraints_check.push_back([map, c,
this](){
return c->check(map(),
this); });
4444 constraints_complete.push_back([map, c,
this](){ c->complete(map(),
this);
return true; });
4446 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4447 constraints_check.push_back([map, c,
this](
const keyT& key){
return c->check(key, map(key),
this); });
4448 constraints_complete.push_back([map, c,
this](
const keyT& key){ c->complete(key, map(key),
this);
return true; });
4455 template<
typename Constra
int,
typename Mapper>
4458 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4464 MPI_Comm_rank(MPI_COMM_WORLD, &
rank);
4467 auto &world_impl = world.
impl();
4471 auto tp = world_impl.taskpool();
4477 std::vector<static_set_arg_fct_arg_t> tmp;
4478 for (
auto it = se.first; it != se.second;) {
4480 tmp.push_back(std::move(it->second));
4485 for (
auto& it : tmp) {
4488 std::get<1>(it).
get(),
", ", std::get<2>(it),
")");
4489 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it).get(), std::get<2>(it),
4490 std::get<0>(it), NULL);
4514 bool do_release =
true;
4520 : copy_to_remove(h.copy_to_remove)
4522 h.copy_to_remove =
nullptr;
4528 std::swap(copy_to_remove, h.copy_to_remove);
4533 if (
nullptr != copy_to_remove) {
4541 template <
typename Value>
4542 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4543 constexpr
auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4544 using value_type = std::remove_reference_t<Value>;
4545 static_assert(value_is_rvref ||
4546 std::is_copy_constructible_v<std::decay_t<Value>>,
4547 "Data sent without being moved must be copy-constructible!");
4550 if (
nullptr == caller) {
4551 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4556 value_type *value_ptr = &value;
4557 if (
nullptr == copy) {
4565 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4566 copy_to_remove = copy;
4568 if constexpr (value_is_rvref) {
4576 if constexpr (value_is_rvref)
4577 return std::move(*value_ptr);
4582 template<
typename Value>
4585 if (
nullptr == caller) {
4586 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4590 if (
nullptr == copy) {
4595 copy_to_remove = copy;
4607 template <
typename Value>
4610 if (
nullptr == caller) {
4611 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4615 const Value *value_ptr = &value;
4616 if (
nullptr == copy) {
4624 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4625 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
const std::string & get_name() const
Gets the name of this operation.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
void print_incomplete_tasks()
void add_constraint(Constraint c, Mapper &&map)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::World get_world() const override final
static constexpr int numinvals
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
void set_priomap(Priomap &&pm)
void set_input_reducer(Reducer &&reducer)
std::tuple_element_t< i, input_terminals_type > * in()
void set_devicemap(Devicemap &&dm)
void set_keymap(Keymap &&km)
keymap setter
void add_constraint(std::shared_ptr< Constraint > c)
void finalize_argstream_from_msg(void *data, std::size_t size)
void set_input_reducer(Reducer &&reducer, std::size_t size)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
bool check_constraints(task_t *task)
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
task_t * create_new_task(const Key &key)
output_terminalsT output_terminals_type
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
static auto & get(InTuple &&intuple)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
void set_defer_writer(bool value)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
virtual void release() override
bool get_defer_writer(bool value)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
static void ht_iter_cb(void *item, void *cb_data)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
parsec_thread_mempool_t * get_task_mempool(void)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
const auto & get_output_terminals() const
static constexpr bool derived_has_device_op()
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
decltype(keymap) const & get_keymap() const
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
void set_static_argstream_size(std::size_t size)
static constexpr const ttg::Runtime runtime
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
static resultT get(InTuple &&intuple)
static constexpr bool derived_has_cuda_op()
void register_static_op_function(void)
void add_constraint(Constraint &&c)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
void argstream_set_size_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
void set_arg_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
void make_executable() override
Marks this executable.
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
decltype(priomap) const & get_priomap() const
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
std::tuple_element_t< i, output_terminalsT > * out()
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
static constexpr bool derived_has_hip_op()
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
void copy_mark_pushout(const Value &value)
void get_from_pull_msg(void *data, std::size_t size)
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
static constexpr bool derived_has_level_zero_op()
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
actual_input_tuple_type input_args_type
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
auto * execution_stream()
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
bool complete_transfer(void)
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
void set_current(int device, Stream stream)
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
bool all_devices_peer_access
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
std::size_t max_inline_size
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
void foreach_parsec_data(Value &&value, Fn &&fn)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
const parsec_symbol_t parsec_taskclass_param1
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
void transfer_ownership_impl(T &&arg, int device)
const parsec_symbol_t parsec_taskclass_param2
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * create_new_datacopy(Value &&value)
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
void release_data_copy(ttg_data_copy_t *copy)
const parsec_symbol_t parsec_taskclass_param3
const parsec_symbol_t parsec_taskclass_param0
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
std::tuple< int, std::unique_ptr< std::byte[]>, size_t > static_set_arg_fct_arg_t
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
int rank(World world=default_execution_context())
ttg::World & get_default_world()
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(value_copy_handler &&h)
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
value_copy_handler & operator=(const value_copy_handler &h)=delete
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t task_constraint_table
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
ttg_parsec_data_flags data_flags
parsec_hash_table_item_t tt_ht_item
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
static void release_task(parsec_ttg_task_base_t *task_base)
static constexpr int mutable_tag
parsec_task_t * get_next_task() const
virtual void * get_ptr()=0
void set_next_task(parsec_task_t *task)
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER