2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
13 #define TTG_PARSEC_DEFER_WRITER false
15 #include "ttg/config.h"
20 #include "../../ttg.h"
57 #include <experimental/type_traits>
72 #if defined(TTG_HAVE_MPI)
74 #if defined(TTG_HAVE_MPIEXT)
81 #include <parsec/class/parsec_hash_table.h>
82 #include <parsec/data_internal.h>
83 #include <parsec/execution_stream.h>
84 #include <parsec/interfaces/interface.h>
85 #include <parsec/mca/device/device.h>
86 #include <parsec/parsec_comm_engine.h>
87 #include <parsec/parsec_internal.h>
88 #include <parsec/scheduling.h>
89 #include <parsec/remote_dep.h>
90 #include <parsec/utils/mca_param.h>
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
149 uint64_t
op_id = std::numeric_limits<uint64_t>::max();
170 static void unregister_parsec_tags(
void *_);
183 uint32_t taskpool_id,
188 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
194 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *
data,
long unsigned int size,
195 int src_rank,
void *obj) {
197 parsec_taskpool_t *tp = NULL;
199 uint64_t op_id = msg->
op_id;
206 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207 static_set_arg_fct = op_pair.first;
208 static_set_arg_fct(
data,
size, op_pair.second);
209 tp->tdm.module->incoming_message_end(tp, NULL);
211 }
catch (
const std::out_of_range &e) {
212 void *data_cpy = malloc(
size);
213 assert(data_cpy != 0);
216 ", ", op_id,
", ", data_cpy,
", ",
size,
")");
223 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
224 int src,
void *cb_data);
227 static bool im =
false;
238 bool _task_profiling;
240 mpi_space_support = {
true,
false,
false};
242 int query_comm_size() {
244 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
248 int query_comm_rank() {
250 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
254 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
260 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
267 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
268 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
269 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
270 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
273 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
276 , own_ctx(c == nullptr)
277 #if defined(PARSEC_PROF_TRACE)
278 , profiling_array(nullptr)
279 , profiling_array_size(0)
281 , _dag_profiling(false)
282 , _task_profiling(false)
285 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
289 #
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
290 || MPIX_Query_cuda_support()
297 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
298 || MPIX_Query_hip_support()
304 #if defined(PARSEC_PROF_TRACE)
305 if(parsec_profile_enabled) {
307 #if defined(PARSEC_TTG_PROFILE_BACKEND)
308 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
309 (
int*)&parsec_ttg_profile_backend_set_arg_start,
310 (
int*)&parsec_ttg_profile_backend_set_arg_end);
311 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
312 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
313 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
314 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
315 sizeof(
size_t),
"size{int64_t}",
316 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
317 (
int*)&parsec_ttg_profile_backend_free_datacopy);
322 #ifdef PARSEC_PROF_GRAPHER
325 dot_param_idx = parsec_mca_param_find(
"profile", NULL,
"dot");
327 if (dot_param_idx != PARSEC_ERROR) {
329 parsec_mca_param_lookup_string(dot_param_idx, &filename);
334 if( NULL != parsec_ce.tag_register) {
348 assert(
nullptr == tpool);
349 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
350 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
352 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
353 tpool->taskpool_name = strdup(
"TTG Taskpool");
354 parsec_taskpool_reserve_id(tpool);
356 tpool->devices_index_mask = 0;
357 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
358 parsec_device_module_t *device = parsec_mca_device_get(i);
359 if( NULL == device )
continue;
360 tpool->devices_index_mask |= (1 << device->device_index);
363 #ifdef TTG_USE_USER_TERMDET
364 parsec_termdet_open_module(tpool,
"user_trigger");
366 parsec_termdet_open_dyn_module(tpool);
374 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
377 #if defined(PARSEC_PROF_TRACE)
378 tpool->profiling_array = profiling_array;
387 parsec_taskpool_started =
false;
407 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
410 if (!parsec_taskpool_started) {
411 parsec_enqueue(ctx, tpool);
412 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
413 tpool->tdm.module->taskpool_ready(tpool);
414 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
416 parsec_taskpool_started =
true;
421 #if defined(PARSEC_PROF_TRACE)
425 tpool->profiling_array =
nullptr;
427 assert(NULL != tpool->tdm.monitor);
428 tpool->tdm.module->unmonitor_taskpool(tpool);
429 parsec_taskpool_free(tpool);
435 if (parsec_taskpool_started) {
437 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
438 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
440 parsec_context_wait(ctx);
442 parsec_taskpool_wait(tpool);
448 unregister_parsec_tags(
nullptr);
450 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
452 #if defined(PARSEC_PROF_TRACE)
453 if(
nullptr != profiling_array) {
454 free(profiling_array);
455 profiling_array =
nullptr;
456 profiling_array_size = 0;
459 if (own_ctx) parsec_fini(&ctx);
475 virtual void dag_on(
const std::string &filename)
override {
476 #if defined(PARSEC_PROF_GRAPHER)
477 if(!_dag_profiling) {
479 size_t len = strlen(filename.c_str())+32;
480 char ext_filename[len];
481 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
482 parsec_prof_grapher_init(ctx, ext_filename);
483 _dag_profiling =
true;
486 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
487 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
492 #if defined(PARSEC_PROF_GRAPHER)
494 parsec_prof_grapher_fini();
495 _dag_profiling =
false;
501 #if defined(PARSEC_PROF_TRACE)
502 _task_profiling =
false;
507 #if defined(PARSEC_PROF_TRACE)
508 _task_profiling =
true;
512 virtual bool profiling()
override {
return _task_profiling; }
515 return mpi_space_support[
static_cast<std::size_t
>(space)];
519 #ifdef TTG_USE_USER_TERMDET
520 if(parsec_taskpool_started) {
522 parsec_taskpool_started =
false;
527 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
530 #if defined(PARSEC_PROF_TRACE)
531 std::stringstream ss;
532 build_composite_name_rec(t->
ttg_ptr(), ss);
539 #if defined(PARSEC_PROF_TRACE)
540 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
543 build_composite_name_rec(t->
ttg_ptr(), ss);
547 void register_new_profiling_event(
const char *name,
int position) {
548 if(2*position >= profiling_array_size) {
549 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
550 profiling_array = (
int*)realloc((
void*)profiling_array,
551 new_profiling_array_size *
sizeof(int));
552 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
553 profiling_array_size = new_profiling_array_size;
554 tpool->profiling_array = profiling_array;
557 assert(0 == tpool->profiling_array[2*position]);
558 assert(0 == tpool->profiling_array[2*position+1]);
562 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
563 (
int*)&tpool->profiling_array[2*position],
564 (
int*)&tpool->profiling_array[2*position+1]);
570 if (!parsec_taskpool_started) {
571 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
575 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
577 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
579 parsec_taskpool_wait(tpool);
592 parsec_context_t *ctx =
nullptr;
593 bool own_ctx =
false;
594 parsec_taskpool_t *tpool =
nullptr;
595 bool parsec_taskpool_started =
false;
596 #if defined(PARSEC_PROF_TRACE)
597 int *profiling_array;
598 std::size_t profiling_array_size;
602 static void unregister_parsec_tags(
void *_pidx)
604 if(NULL != parsec_ce.tag_unregister) {
613 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
621 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
629 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
637 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
647 if (task ==
nullptr ||
ptr ==
nullptr) {
652 if (NULL != copy && copy->get_ptr() ==
ptr) {
662 if (task ==
nullptr ||
ptr ==
nullptr) {
667 if (NULL != copy && copy->get_ptr() ==
ptr) {
675 if (task ==
nullptr || copy ==
nullptr) {
679 if (MAX_PARAM_COUNT < task->data_count) {
680 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
692 if (copy == task->
copies[i]) {
702 task->
copies[i] =
nullptr;
706 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
707 #warning "ttg::PaRSEC enables data copy tracking"
708 static std::unordered_set<ttg_data_copy_t *> pending_copies;
709 static std::mutex pending_copies_mutex;
711 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
712 static int64_t parsec_ttg_data_copy_uid = 0;
715 template <
typename Value>
717 using value_type = std::decay_t<Value>;
720 std::is_constructible_v<value_type, decltype(value)>) {
721 copy =
new value_type(std::forward<Value>(value));
726 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
728 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
731 copy->size =
sizeof(Value);
732 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
734 static_cast<uint64_t
>(copy->uid),
735 PROFILE_OBJECT_ID_NULL, ©->size,
736 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
739 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
741 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
742 auto rc = pending_copies.insert(copy);
743 assert(std::get<1>(rc));
750 template <std::size_t... IS,
typename Key = keyT>
752 int junk[] = {0, (invoke_pull_terminal<IS>(
753 std::get<IS>(input_terminals), key, task),
763 parsec_data_transfer_ownership_to_copy(
data, device, PARSEC_FLOW_ACCESS_RW);
765 data->device_copies[0]->version++;
770 template<
typename TT, std::size_t... Is>
775 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>
> *>(
780 template<
typename TT>
781 inline parsec_hook_return_t
hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
783 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
786 return me->template invoke_op<ttg::ExecutionSpace::Host>();
789 template<
typename TT>
790 inline parsec_hook_return_t
hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
793 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
795 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
796 return PARSEC_HOOK_RETURN_ERROR;
800 template<
typename TT>
801 inline parsec_hook_return_t
hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
804 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
806 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
807 return PARSEC_HOOK_RETURN_ERROR;
811 template<
typename TT>
812 inline parsec_hook_return_t
hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
815 return me->template invoke_op<ttg::ExecutionSpace::L0>();
817 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
818 return PARSEC_HOOK_RETURN_ERROR;
823 template<
typename TT>
824 inline parsec_hook_return_t
evaluate_cuda(
const parsec_task_t *parsec_task) {
827 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
829 return PARSEC_HOOK_RETURN_NEXT;
833 template<
typename TT>
834 inline parsec_hook_return_t
evaluate_hip(
const parsec_task_t *parsec_task) {
837 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
839 return PARSEC_HOOK_RETURN_NEXT;
843 template<
typename TT>
847 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
849 return PARSEC_HOOK_RETURN_NEXT;
854 template <
typename KeyT,
typename ActivationCallbackT>
856 std::vector<KeyT> _keylist;
857 std::atomic<int> _outstanding_transfers;
858 ActivationCallbackT _cb;
863 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
866 int left = --_outstanding_transfers;
868 _cb(std::move(_keylist), _copy);
875 template <
typename ActivationT>
876 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
877 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
879 parsec_ce.mem_unregister(&lreg);
880 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
881 if (activation->complete_transfer()) {
884 return PARSEC_SUCCESS;
887 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
888 int src,
void *cb_data) {
889 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
890 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
893 return PARSEC_SUCCESS;
896 template <
typename FuncT>
897 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
898 int src,
void *cb_data) {
899 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
900 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
903 return PARSEC_SUCCESS;
917 }
else if (readers == 1) {
923 if (1 == readers || readers == copy->
mutable_tag) {
924 std::atomic_thread_fence(std::memory_order_acquire);
937 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
939 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
940 size_t rc = pending_copies.erase(copy);
944 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
948 static_cast<uint64_t
>(copy->uid),
949 PROFILE_OBJECT_ID_NULL, ©->size,
950 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
958 template <
typename Value>
961 bool replace =
false;
963 assert(readers != 0);
967 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->
defer_writer;
993 }
else if (!readonly) {
1007 if (1 == copy_in->
num_readers() && !defer_writer) {
1014 std::atomic_thread_fence(std::memory_order_release);
1028 if (NULL == copy_res) {
1030 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1037 for (
int i = 0; i < deferred_op->
data_count; ++i) {
1038 if (deferred_op->
copies[i] == copy_in) {
1039 deferred_op->
copies[i] = new_copy;
1052 copy_res = new_copy;
1056 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<Value>).name() +
" but the type is not copyable");
1065 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1068 int mpi_initialized;
1069 MPI_Initialized(&mpi_initialized);
1070 if (!mpi_initialized) {
1072 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1074 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1087 for (
int i = 0; i < parsec_nb_devices; ++i) {
1088 bool is_gpu = parsec_mca_device_is_gpu(i);
1092 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1097 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1098 if (
nullptr != ttg_max_inline_cstr) {
1099 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1105 bool all_peer_access =
true;
1107 for (
int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1108 parsec_device_module_t *idevice = parsec_mca_device_get(i);
1109 if (PARSEC_DEV_IS_GPU(idevice->type)) {
1110 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1111 for (
int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1113 parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1114 if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1115 all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true :
false;
1130 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1139 template <
typename T>
1141 world.
impl().register_ptr(
ptr);
1144 template <
typename T>
1146 world.
impl().register_ptr(std::move(
ptr));
1150 world.
impl().register_status(status_ptr);
1153 template <
typename Callback>
1155 world.
impl().register_callback(std::forward<Callback>(callback));
1161 double result = 0.0;
1162 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1167 MPI_Barrier(world.
impl().comm());
1172 template <
typename T>
1175 if (world.
rank() == source_rank) {
1178 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1180 unsigned char *buf =
new unsigned char[BUFLEN];
1181 if (world.
rank() == source_rank) {
1184 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1185 if (world.
rank() != source_rank) {
1198 parsec_task_class_t
self;
1203 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs, ttg::ExecutionSpace Space>
1207 static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1208 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1210 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1213 static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1214 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1215 static_assert((ttg::meta::none_has_reference_v<input_valueTs>),
"Input typelist cannot contain reference types");
1216 static_assert(ttg::meta::is_none_Void_v<input_valueTs>,
"ttg::Void is for internal use only, do not use it");
1218 parsec_mempool_t mempools;
1221 template <
typename T>
1222 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1224 template <
typename T>
1225 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1227 template <
typename T>
1228 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1232 static constexpr
int numinedges = std::tuple_size_v<input_tuple_type>;
1233 static constexpr
int numins = std::tuple_size_v<actual_input_tuple_type>;
1234 static constexpr
int numouts = std::tuple_size_v<output_terminalsT>;
1235 static constexpr
int numflows = std::max(numins, numouts);
1239 template<
typename DerivedT = derivedT>
1245 template<
typename DerivedT = derivedT>
1251 template<
typename DerivedT = derivedT>
1257 template<
typename DerivedT = derivedT>
1259 return (derived_has_cuda_op<DerivedT>() ||
1260 derived_has_hip_op<DerivedT>() ||
1261 derived_has_level_zero_op<DerivedT>());
1266 "Data sent from a device-capable template task must be serializable.");
1275 ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1277 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1282 std::tuple_size_v<input_refs_tuple_type>;
1288 template <std::
size_t i,
typename resultT,
typename InTuple>
1289 static resultT
get(InTuple &&intuple) {
1290 return static_cast<resultT
>(std::get<i>(std::forward<InTuple>(intuple)));
1292 template <std::
size_t i,
typename InTuple>
1293 static auto &
get(InTuple &&intuple) {
1294 return std::get<i>(std::forward<InTuple>(intuple));
1304 constexpr
static const size_t task_key_offset =
sizeof(task_t);
1307 output_terminalsT output_terminals;
1313 template <std::size_t... IS>
1314 static constexpr
auto make_set_args_fcts(std::index_sequence<IS...>) {
1315 using resultT = decltype(set_arg_from_msg_fcts);
1316 return resultT{{&TT::set_arg_from_msg<IS>...}};
1318 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_arg_from_msg_fcts =
1319 make_set_args_fcts(std::make_index_sequence<numins>{});
1321 template <std::size_t... IS>
1322 static constexpr
auto make_set_size_fcts(std::index_sequence<IS...>) {
1323 using resultT = decltype(set_argstream_size_from_msg_fcts);
1324 return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1326 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1327 make_set_size_fcts(std::make_index_sequence<numins>{});
1329 template <std::size_t... IS>
1330 static constexpr
auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1331 using resultT = decltype(finalize_argstream_from_msg_fcts);
1332 return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1334 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1335 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1337 template <std::size_t... IS>
1338 static constexpr
auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1339 using resultT = decltype(get_from_pull_msg_fcts);
1340 return resultT{{&TT::get_from_pull_msg<IS>...}};
1342 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1343 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1345 template<std::size_t... IS>
1346 constexpr
static auto make_input_is_const(std::index_sequence<IS...>) {
1347 using resultT = decltype(input_is_const);
1348 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1350 constexpr
static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1353 ttg::meta::detail::keymap_t<keyT> keymap;
1354 ttg::meta::detail::keymap_t<keyT> priomap;
1355 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1357 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1359 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1360 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1361 int num_pullins = 0;
1365 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1366 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1375 template <
typename... Args>
1376 auto op(Args &&...args) {
1377 derivedT *derived =
static_cast<derivedT *
>(
this);
1378 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1379 if constexpr (std::is_same_v<return_type,void>) {
1380 derived->op(std::forward<Args>(args)...);
1384 return derived->op(std::forward<Args>(args)...);
1388 template <std::
size_t i,
typename terminalT,
typename Key>
1389 void invoke_pull_terminal(terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1390 if (
in.is_pull_terminal) {
1391 auto owner =
in.container.owner(key);
1392 if (owner != world.rank()) {
1393 get_pull_terminal_data_from<i>(owner, key);
1396 set_arg<i>(key, (
in.container).get(key));
1401 template <std::
size_t i,
typename Key>
1402 void get_pull_terminal_data_from(
const int owner,
1404 using msg_t = detail::msg_t;
1405 auto &world_impl = world.impl();
1406 parsec_taskpool_t *tp = world_impl.taskpool();
1407 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), tp->taskpool_id,
1412 pos =
pack(key, msg->bytes, pos);
1413 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1414 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1415 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
1416 sizeof(msg_header_t) + pos);
1419 template <std::size_t... IS,
typename Key = keyT>
1420 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1421 int junk[] = {0, (invoke_pull_terminal<IS>(
1422 std::get<IS>(input_terminals), key, task),
1427 template <std::size_t... IS>
1428 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1430 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1431 task->copies[IS]->get_ptr()))...};
1434 #ifdef TTG_HAVE_DEVICE
1438 static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1439 parsec_gpu_task_t *gpu_task,
1440 parsec_gpu_exec_stream_t *gpu_stream) {
1442 task_t *task = (task_t*)gpu_task->ec;
1444 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1446 task->dev_ptr->stream = gpu_stream;
1451 auto dev_data = dev_task.promise();
1454 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1455 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1457 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1459 parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1463 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1465 parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1469 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1471 parsec_level_zero_exec_stream_t *stream;
1472 stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1479 static_op(&task->parsec_task);
1483 auto discard_tmp_flows = [&](){
1484 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1487 const_cast<parsec_flow_t*
>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1488 task->parsec_task.data[i].data_out->readers = 1;
1494 int rc = PARSEC_HOOK_RETURN_DONE;
1495 if (
nullptr != task->suspended_task_address) {
1497 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1498 dev_data = dev_task.promise();
1500 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1501 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1502 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1504 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1505 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1508 discard_tmp_flows();
1511 rc = PARSEC_HOOK_RETURN_AGAIN;
1516 discard_tmp_flows();
1521 static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1523 task_t *task = (task_t*)parsec_task;
1524 if (task->dev_ptr->gpu_task ==
nullptr) {
1527 parsec_gpu_task_t *gpu_task;
1529 gpu_task =
static_cast<parsec_gpu_task_t*
>(std::calloc(1,
sizeof(*gpu_task)));
1530 PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1531 gpu_task->ec = parsec_task;
1532 gpu_task->task_type = 0;
1533 gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max();
1534 gpu_task->pushout = 0;
1535 gpu_task->submit = &TT::device_static_submit;
1544 task->dev_ptr->gpu_task = gpu_task;
1547 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1550 task->dev_ptr->task_class = *task->parsec_task.task_class;
1553 static_op(parsec_task);
1557 parsec_task_class_t& tc = task->dev_ptr->task_class;
1560 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1561 tc.in[i] = gpu_task->flow[i];
1562 tc.out[i] = gpu_task->flow[i];
1564 tc.nb_flows = MAX_PARAM_COUNT;
1568 if (tt->devicemap) {
1570 if constexpr (std::is_void_v<keyT>) {
1575 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1577 if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1578 parsec_data_t *
data = parsec_task->data[i].data_in->original;
1582 if (
data->owner_device == 0) {
1583 parsec_advise_data_on_device(
data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1590 task->parsec_task.task_class = &task->dev_ptr->task_class;
1593 return PARSEC_HOOK_RETURN_DONE;
1596 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1599 return PARSEC_HOOK_RETURN_ERROR;
1603 static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1608 task_t *task = (task_t*)parsec_task;
1610 if (
nullptr == task->suspended_task_address) {
1612 return PARSEC_HOOK_RETURN_DONE;
1616 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1619 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1621 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1622 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1624 return PARSEC_HOOK_RETURN_DONE;
1627 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1628 assert(NULL != device);
1630 task->dev_ptr->device = device;
1631 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1632 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1634 switch(device->super.type) {
1636 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1637 case PARSEC_DEV_CUDA:
1641 gpu_task->stage_in = parsec_default_gpu_stage_in;
1642 gpu_task->stage_out = parsec_default_gpu_stage_out;
1643 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1647 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1648 case PARSEC_DEV_HIP:
1650 gpu_task->stage_in = parsec_default_gpu_stage_in;
1651 gpu_task->stage_out = parsec_default_gpu_stage_out;
1652 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1656 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1657 case PARSEC_DEV_LEVEL_ZERO:
1659 gpu_task->stage_in = parsec_default_gpu_stage_in;
1660 gpu_task->stage_out = parsec_default_gpu_stage_out;
1661 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1668 ttg::print_error(task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1670 return PARSEC_HOOK_RETURN_DONE;
1674 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1676 task_t *task = (task_t*)parsec_task;
1677 void* suspended_task_address =
1678 #ifdef TTG_HAVE_COROUTINE
1679 task->suspended_task_address;
1684 if (suspended_task_address ==
nullptr) {
1686 ttT *baseobj = task->tt;
1687 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1690 if (obj->tracing()) {
1691 if constexpr (!ttg::meta::is_void_v<keyT>)
1692 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": executing");
1694 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : executing");
1697 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1698 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1699 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1700 }
else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1701 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1702 }
else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1703 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1704 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1705 }
else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1714 #ifdef TTG_HAVE_COROUTINE
1717 #ifdef TTG_HAVE_DEVICE
1719 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1723 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1724 task->tt->set_outputs_tls_ptr();
1726 if (coro.completed()) {
1728 suspended_task_address =
nullptr;
1730 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1736 assert(ret.ready());
1737 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1738 task->tt->set_outputs_tls_ptr();
1740 if (ret.completed()) {
1742 suspended_task_address =
nullptr;
1750 for (
auto &event_ptr : events) {
1751 event_ptr->finish();
1755 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1757 task->suspended_task_address = suspended_task_address;
1765 #ifdef TTG_HAVE_COROUTINE
1766 task->suspended_task_address = suspended_task_address;
1768 if (suspended_task_address ==
nullptr) {
1769 ttT *baseobj = task->tt;
1770 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1771 if (obj->tracing()) {
1772 if constexpr (!ttg::meta::is_void_v<keyT>)
1773 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": done executing");
1775 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1779 return PARSEC_HOOK_RETURN_DONE;
1782 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1783 task_t *task =
static_cast<task_t*
>(parsec_task);
1785 void* suspended_task_address =
1786 #ifdef TTG_HAVE_COROUTINE
1787 task->suspended_task_address;
1791 if (suspended_task_address ==
nullptr) {
1792 ttT *baseobj = (
ttT *)task->object_ptr;
1793 derivedT *obj = (derivedT *)task->object_ptr;
1796 if constexpr (!ttg::meta::is_void_v<keyT>) {
1797 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1798 }
else if constexpr (ttg::meta::is_void_v<keyT>) {
1805 #ifdef TTG_HAVE_COROUTINE
1807 assert(ret.ready());
1809 if (ret.completed()) {
1811 suspended_task_address =
nullptr;
1820 task->suspended_task_address = suspended_task_address;
1822 if (suspended_task_address) {
1825 return PARSEC_HOOK_RETURN_AGAIN;
1828 return PARSEC_HOOK_RETURN_DONE;
1831 template <std::
size_t i>
1832 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1833 using rtask_t = detail::reducer_task_t;
1834 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1835 constexpr
const bool val_is_void = ttg::meta::is_void_v<value_t>;
1836 constexpr
const bool input_is_const = std::is_const_v<value_t>;
1837 rtask_t *rtask = (rtask_t*)parsec_task;
1838 task_t *parent_task =
static_cast<task_t*
>(rtask->parent_task);
1839 ttT *baseobj = parent_task->tt;
1840 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1842 auto& reducer = std::get<i>(baseobj->input_reducers);
1846 if (obj->tracing()) {
1847 if constexpr (!ttg::meta::is_void_v<keyT>)
1848 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1850 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : reducer executing");
1854 detail::ttg_data_copy_t *target_copy;
1855 target_copy = parent_task->copies[i];
1856 assert(val_is_void ||
nullptr != target_copy);
1859 std::size_t
size = 0;
1860 assert(parent_task->streams[i].reduce_count > 0);
1861 if (rtask->is_first) {
1862 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1864 if (obj->tracing()) {
1865 if constexpr (!ttg::meta::is_void_v<keyT>)
1866 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1868 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : first reducer empty");
1871 return PARSEC_HOOK_RETURN_DONE;
1879 if constexpr(!val_is_void) {
1881 detail::ttg_data_copy_t *source_copy;
1882 parsec_list_item_t *item;
1883 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1884 if (
nullptr == item) {
1888 source_copy = ((detail::ttg_data_copy_self_t *)(item))->
self;
1889 assert(target_copy->num_readers() == target_copy->mutable_tag);
1890 assert(source_copy->num_readers() > 0);
1891 reducer(*
reinterpret_cast<std::decay_t<value_t> *
>(target_copy->get_ptr()),
1892 *
reinterpret_cast<std::decay_t<value_t> *
>(source_copy->get_ptr()));
1894 }
else if constexpr(val_is_void) {
1898 size = ++parent_task->streams[i].size;
1900 }
while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1904 bool complete = (
size >= parent_task->streams[i].goal);
1909 if (complete && c == 0) {
1910 if constexpr(input_is_const) {
1912 target_copy->reset_readers();
1915 parent_task->remove_from_hash =
true;
1916 parent_task->release_task(parent_task);
1921 if (obj->tracing()) {
1922 if constexpr (!ttg::meta::is_void_v<keyT>)
1923 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": done executing");
1925 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1928 return PARSEC_HOOK_RETURN_DONE;
1933 template <
typename T>
1934 uint64_t
unpack(T &obj,
void *_bytes, uint64_t pos) {
1936 uint64_t payload_size;
1937 if constexpr (!dd_t::serialize_size_is_const) {
1940 payload_size = dd_t::payload_size(&obj);
1942 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1946 template <
typename T>
1949 uint64_t payload_size = dd_t::payload_size(&obj);
1950 if constexpr (!dd_t::serialize_size_is_const) {
1953 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1959 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1961 derivedT *obj =
reinterpret_cast<derivedT *
>(bop);
1962 switch (hd->
fn_id) {
1966 assert(hd->
param_id < obj->set_arg_from_msg_fcts.size());
1967 auto member = obj->set_arg_from_msg_fcts[hd->
param_id];
1977 assert(hd->
param_id < obj->set_argstream_size_from_msg_fcts.size());
1978 auto member = obj->set_argstream_size_from_msg_fcts[hd->
param_id];
1984 assert(hd->
param_id < obj->finalize_argstream_from_msg_fcts.size());
1985 auto member = obj->finalize_argstream_from_msg_fcts[hd->
param_id];
1991 assert(hd->
param_id < obj->get_from_pull_msg_fcts.size());
1992 auto member = obj->get_from_pull_msg_fcts[hd->
param_id];
2003 auto &world_impl = world.impl();
2004 parsec_execution_stream_s *es = world_impl.execution_stream();
2005 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
2006 return &mempools.thread_mempools[index];
2009 template <
size_t i,
typename valueT>
2013 parsec_execution_stream_s *es = world.impl().execution_stream();
2015 dummy =
new (parsec_thread_mempool_allocate(mempool))
task_t(mempool, &this->
self,
this);
2023 dummy->
parsec_task.taskpool = world.impl().taskpool();
2030 parsec_task_t *task_ring =
nullptr;
2031 for (
auto &&key : keylist) {
2033 if constexpr (std::is_copy_constructible_v<valueT>) {
2034 set_arg_local_impl<i>(key, *
reinterpret_cast<valueT *
>(copy->
get_ptr()), copy, &task_ring);
2038 static_assert(!std::is_reference_v<valueT>);
2040 set_arg_local_impl<i>(key, std::move(*
reinterpret_cast<valueT *
>(copy->
get_ptr())), copy, &task_ring);
2042 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<valueT>).name() +
" but the type is not copyable");
2047 if (
nullptr != task_ring) {
2048 auto &world_impl = world.impl();
2049 parsec_task_t *vp_task_ring[1] = { task_ring };
2050 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2057 complete_task_and_release(es, &dummy->
parsec_task);
2058 parsec_thread_mempool_free(mempool, &dummy->
parsec_task);
2070 template <std::
size_t i>
2072 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2074 msg_t *msg =
static_cast<msg_t *
>(
data);
2075 if constexpr (!ttg::meta::is_void_v<keyT>) {
2079 uint64_t key_end_pos;
2080 std::vector<keyT> keylist;
2081 int num_keys = msg->tt_id.num_keys;
2082 keylist.reserve(num_keys);
2083 auto rank = world.rank();
2084 for (
int k = 0; k < num_keys; ++k) {
2086 pos =
unpack(key, msg->bytes, pos);
2087 assert(keymap(key) ==
rank);
2088 keylist.push_back(std::move(key));
2094 if constexpr (!ttg::meta::is_void_v<valueT>) {
2095 using decvalueT = std::decay_t<valueT>;
2096 int32_t num_iovecs = msg->tt_id.num_iovecs;
2101 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2104 metadata_t metadata;
2105 pos =
unpack(metadata, msg->bytes, pos);
2114 parsec_gpu_data_copy_t* gpu_elem;
2115 gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2118 while (i < parsec_nb_devices) {
2119 if (
nullptr == gpu_elem) {
2120 gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2121 gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2122 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2123 gpu_elem->version = 0;
2124 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2126 if (
nullptr == gpu_elem->device_private) {
2127 gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2128 if (
nullptr == gpu_elem->device_private) {
2137 pos =
unpack(*
static_cast<decvalueT *
>(copy->
get_ptr()), msg->bytes, pos);
2140 if (num_iovecs == 0) {
2141 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2146 int remote = msg->tt_id.sender;
2147 assert(remote < world.size());
2149 auto &val = *
static_cast<decvalueT *
>(copy->
get_ptr());
2151 bool inline_data = msg->tt_id.inline_data;
2154 parsec_ce_tag_t cbtag;
2156 auto create_activation_fn = [&]() {
2158 std::memcpy(&cbtag, msg->bytes + pos,
sizeof(cbtag));
2159 pos +=
sizeof(cbtag);
2164 set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2165 this->world.impl().decrement_inflight_msg();
2169 auto read_inline_data = [&](
auto&& iovec){
2172 std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes);
2173 pos += iovec.num_bytes;
2175 auto handle_iovec_fn = [&](
auto&& iovec,
auto activation) {
2176 using ActivationT = std::decay_t<decltype(*activation)>;
2179 parsec_ce_mem_reg_handle_t rreg;
2180 int32_t rreg_size_i;
2181 std::memcpy(&rreg_size_i, msg->bytes + pos,
sizeof(rreg_size_i));
2182 pos +=
sizeof(rreg_size_i);
2183 rreg =
static_cast<parsec_ce_mem_reg_handle_t
>(msg->bytes + pos);
2187 std::intptr_t fn_ptr;
2188 std::memcpy(&fn_ptr, msg->bytes + pos,
sizeof(fn_ptr));
2189 pos +=
sizeof(fn_ptr);
2192 parsec_ce_mem_reg_handle_t lreg;
2194 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2195 iovec.num_bytes, &lreg, &lreg_size);
2196 world.impl().increment_inflight_msg();
2199 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote,
2200 &detail::get_complete_cb<ActivationT>, activation,
2202 cbtag, &fn_ptr,
sizeof(std::intptr_t));
2207 for (
auto&& iov : descr.get_data(val)) {
2208 read_inline_data(iov);
2211 auto activation = create_activation_fn();
2212 for (
auto&& iov : descr.get_data(val)) {
2213 handle_iovec_fn(iov, activation);
2222 auto activation = create_activation_fn();
2224 handle_iovec_fn(
ttg::iovec{
data->nb_elts,
data->device_copies[
data->owner_device]->device_private}, activation);
2229 assert(num_iovecs == nv);
2233 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2237 }
else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2238 for (
auto &&key : keylist) {
2239 set_arg<i, keyT, ttg::Void>(key,
ttg::Void{});
2243 }
else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2244 using decvalueT = std::decay_t<valueT>;
2247 unpack(val, msg->bytes, 0);
2248 set_arg<i, keyT, valueT>(std::move(val));
2250 }
else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2251 set_arg<i, keyT, ttg::Void>(
ttg::Void{});
2257 template <std::
size_t i>
2260 msg_t *msg =
static_cast<msg_t *
>(
data);
2261 if constexpr (!ttg::meta::is_void_v<keyT>) {
2264 auto rank = world.rank();
2266 pos =
unpack(key, msg->bytes, pos);
2267 assert(keymap(key) ==
rank);
2268 finalize_argstream<i>(key);
2270 auto rank = world.rank();
2271 assert(keymap() ==
rank);
2272 finalize_argstream<i>();
2276 template <std::
size_t i>
2279 auto msg =
static_cast<msg_t *
>(
data);
2281 if constexpr (!ttg::meta::is_void_v<keyT>) {
2283 auto rank = world.rank();
2285 pos =
unpack(key, msg->bytes, pos);
2286 assert(keymap(key) ==
rank);
2287 std::size_t argstream_size;
2288 pos =
unpack(argstream_size, msg->bytes, pos);
2289 set_argstream_size<i>(key, argstream_size);
2291 auto rank = world.rank();
2292 assert(keymap() ==
rank);
2293 std::size_t argstream_size;
2294 pos =
unpack(argstream_size, msg->bytes, pos);
2295 set_argstream_size<i>(argstream_size);
2299 template <std::
size_t i>
2302 msg_t *msg =
static_cast<msg_t *
>(
data);
2303 auto &
in = std::get<i>(input_terminals);
2304 if constexpr (!ttg::meta::is_void_v<keyT>) {
2308 pos =
unpack(key, msg->bytes, pos);
2309 set_arg<i>(key, (
in.container).get(key));
2313 template <std::
size_t i,
typename Key,
typename Value>
2314 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2315 const Key &key, Value &&value) {
2316 set_arg_local_impl<i>(key, std::forward<Value>(value));
2319 template <std::
size_t i,
typename Key = keyT,
typename Value>
2320 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2322 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2325 template <std::
size_t i,
typename Key,
typename Value>
2326 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2327 const Key &key,
const Value &value) {
2328 set_arg_local_impl<i>(key, value);
2331 template <std::
size_t i,
typename Key = keyT,
typename Value>
2332 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2333 const Value &value) {
2334 set_arg_local_impl<i>(
ttg::Void{}, value);
2337 template <std::
size_t i,
typename Key = keyT,
typename Value>
2338 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2339 std::shared_ptr<const Value> &valueptr) {
2340 set_arg_local_impl<i>(
ttg::Void{}, *valueptr);
2343 template <
typename Key>
2345 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2346 auto &world_impl = world.impl();
2349 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2350 int32_t priority = 0;
2351 if constexpr (!keyT_is_Void) {
2352 priority = priomap(key);
2354 newtask =
new (taskobj)
task_t(key, mempool, &this->
self, world_impl.taskpool(),
this, priority);
2356 priority = priomap();
2358 newtask =
new (taskobj)
task_t(mempool, &this->
self, world_impl.taskpool(),
this, priority);
2361 for (
int i = 0; i < static_stream_goal.size(); ++i) {
2362 newtask->
streams[i].goal = static_stream_goal[i];
2370 template <std::
size_t i>
2374 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2375 auto &world_impl = world.impl();
2378 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2380 int32_t priority = 0;
2381 if constexpr (!keyT_is_Void) {
2382 priority = priomap(task->
key);
2385 priority = priomap();
2390 world_impl.taskpool(), priority, is_first);
2397 template <std::
size_t i,
typename Key,
typename Value>
2399 parsec_task_t **task_ring =
nullptr) {
2400 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2401 constexpr
const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2402 constexpr
const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2403 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2406 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ", i);
2408 parsec_key_t hk = 0;
2409 if constexpr (!keyT_is_Void) {
2410 hk =
reinterpret_cast<parsec_key_t
>(&key);
2411 assert(keymap(key) == world.rank());
2415 auto &world_impl = world.impl();
2416 auto &reducer = std::get<i>(input_reducers);
2418 bool remove_from_hash =
true;
2419 #if defined(PARSEC_PROF_GRAPHER)
2420 bool discover_task =
true;
2422 bool get_pull_data =
false;
2423 bool has_lock =
false;
2425 if (numins > 1 || reducer) {
2428 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
2430 world_impl.increment_created();
2433 if( world_impl.dag_profiling() ) {
2434 #if defined(PARSEC_PROF_GRAPHER)
2435 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2439 }
else if (!reducer && numins == (task->
in_data_count + 1)) {
2441 parsec_hash_table_nolock_remove(&
tasks_table, hk);
2442 remove_from_hash =
false;
2446 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2451 world_impl.increment_created();
2452 remove_from_hash =
false;
2453 if( world_impl.dag_profiling() ) {
2454 #if defined(PARSEC_PROF_GRAPHER)
2455 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2461 if( world_impl.dag_profiling() ) {
2462 #if defined(PARSEC_PROF_GRAPHER)
2467 if(orig_index >= 0) {
2468 snprintf(orig_str, 32,
"%d", orig_index);
2470 strncpy(orig_str,
"_", 32);
2472 snprintf(dest_str, 32,
"%lu", i);
2473 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2474 .flow_index = 0, .flow_datatype_mask = ~0 };
2475 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2476 .flow_index = 0, .flow_datatype_mask = ~0 };
2487 if (
nullptr != copy) {
2489 copy = detail::register_data_copy<valueT>(copy, task,
is_const);
2500 if (reducer && 1 != task->
streams[i].goal) {
2501 auto submit_reducer_task = [&](
auto *parent_task){
2503 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2508 reduce_task = create_new_reducer_task<i>(parent_task,
false);
2513 if constexpr (!ttg::meta::is_void_v<valueT>) {
2516 if (
nullptr == (copy = task->
copies[i])) {
2517 using decay_valueT = std::decay_t<valueT>;
2522 reduce_task = create_new_reducer_task<i>(task,
true);
2526 task->
streams[i].reduce_count.store(1, std::memory_order_relaxed);
2542 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2545 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2551 parsec_lifo_push(&task->
streams[i].reduce_copies, ©->
super);
2552 submit_reducer_task(task);
2556 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2558 submit_reducer_task(task);
2568 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2571 if constexpr (!valueT_is_Void) {
2572 if (
nullptr != task->
copies[i]) {
2574 throw std::logic_error(
"bad set arg");
2594 if constexpr (!ttg::meta::is_void_v<keyT>) {
2595 if (get_pull_data) {
2602 bool constrained =
false;
2603 if (constraints_check.size() > 0) {
2604 if constexpr (ttg::meta::is_void_v<keyT>) {
2605 constrained = !constraints_check[0]();
2607 constrained = !constraints_check[0](task->
key);
2614 return !constrained;
2617 template<
typename Key = keyT>
2620 assert(cid < constraints_check.size());
2622 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2623 if (!constraints_check[i]()) {
2631 parsec_key_t hk = 0;
2633 assert(task !=
nullptr);
2634 auto &world_impl = world.impl();
2635 parsec_execution_stream_t *es = world_impl.execution_stream();
2636 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2637 __parsec_schedule_vp(es, vp_task_rings, 0);
2641 template<
typename Key = keyT>
2642 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
release_constraint(std::size_t cid,
const std::span<Key>& keys) {
2643 assert(cid < constraints_check.size());
2644 parsec_task_t *task_ring =
nullptr;
2645 for (
auto& key : keys) {
2648 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2649 if (!constraints_check[i](key)) {
2657 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
2659 assert(task !=
nullptr);
2660 if (task_ring ==
nullptr) {
2665 parsec_list_item_ring_push_sorted(&task_ring->super, &task->
parsec_task.super,
2666 offsetof(parsec_task_t, priority));
2670 if (
nullptr != task_ring) {
2671 auto &world_impl = world.impl();
2672 parsec_execution_stream_t *es = world_impl.execution_stream();
2673 parsec_task_t *vp_task_rings[1] = { task_ring };
2674 __parsec_schedule_vp(es, vp_task_rings, 0);
2679 parsec_task_t **task_ring =
nullptr) {
2680 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2689 count = parsec_atomic_fetch_inc_int32(&task->
in_data_count) + 1;
2690 assert(count <=
self.dependencies_goal);
2693 auto &world_impl = world.impl();
2694 ttT *baseobj = task->
tt;
2696 if (count == numins) {
2697 parsec_execution_stream_t *es = world_impl.execution_stream();
2698 parsec_key_t hk = task->
pkey();
2700 if constexpr (!keyT_is_Void) {
2709 if (
nullptr == task_ring) {
2710 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2711 __parsec_schedule_vp(es, vp_task_rings, 0);
2712 }
else if (*task_ring ==
nullptr) {
2717 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->
parsec_task.super,
2718 offsetof(parsec_task_t, priority));
2721 }
else if constexpr (!ttg::meta::is_void_v<keyT>) {
2722 if ((baseobj->num_pullins + count == numins) && baseobj->
is_lazy_pull()) {
2730 template <std::
size_t i,
typename Key,
typename Value>
2731 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2733 set_arg_impl<i>(key, std::forward<Value>(value));
2737 template <std::
size_t i,
typename Key,
typename Value>
2738 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(Value &&value) {
2739 set_arg_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2742 template <std::
size_t i,
typename Key = keyT>
2743 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2748 template <std::
size_t i,
typename Key>
2749 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2753 template<
typename Value,
typename Key>
2760 using decvalueT = std::decay_t<Value>;
2761 bool inline_data =
false;
2763 std::size_t iov_size = 0;
2764 std::size_t metadata_size = 0;
2767 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2768 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2769 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2770 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2779 std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2787 template <std::
size_t i,
typename Key,
typename Value>
2790 using decvalueT = std::decay_t<Value>;
2791 using norefvalueT = std::remove_reference_t<Value>;
2792 norefvalueT *value_ptr = &value;
2794 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2795 if(world.impl().profiling()) {
2796 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2800 if constexpr (!ttg::meta::is_void_v<Key>)
2801 owner = keymap(key);
2804 if (owner == world.rank()) {
2805 if constexpr (!ttg::meta::is_void_v<keyT>)
2806 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2808 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value), copy_in);
2809 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2810 if(world.impl().profiling()) {
2811 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2820 auto &world_impl = world.impl();
2823 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2826 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2830 if (
nullptr == copy) {
2832 if (
nullptr == copy) {
2836 value_ptr =
static_cast<norefvalueT*
>(copy->
get_ptr());
2841 msg->tt_id.inline_data = inline_data;
2843 auto write_header_fn = [&]() {
2848 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2849 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2850 pos +=
sizeof(cbtag);
2853 auto handle_iovec_fn = [&](
auto&& iovec){
2857 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
2858 pos += iovec.num_bytes;
2865 copy = detail::register_data_copy<decvalueT>(copy,
nullptr,
true);
2866 parsec_ce_mem_reg_handle_t lreg;
2869 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2870 iovec.num_bytes, &lreg, &lreg_size);
2871 auto lreg_ptr = std::shared_ptr<void>{lreg, [](
void *
ptr) {
2872 parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)
ptr;
2873 parsec_ce.mem_unregister(&memreg);
2875 int32_t lreg_size_i = lreg_size;
2876 std::memcpy(msg->bytes + pos, &lreg_size_i,
sizeof(lreg_size_i));
2877 pos +=
sizeof(lreg_size_i);
2878 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2882 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2888 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2889 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
2890 pos +=
sizeof(fn_ptr);
2896 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2897 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2899 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2900 pos =
pack(metadata, msg->bytes, pos);
2903 for (
auto&& iov : iovs) {
2904 handle_iovec_fn(iov);
2908 pos =
pack(*value_ptr, msg->bytes, pos, copy);
2918 msg->tt_id.num_iovecs = num_iovecs;
2922 msg->tt_id.num_keys = 0;
2923 msg->tt_id.key_offset = pos;
2924 if constexpr (!ttg::meta::is_void_v<Key>) {
2925 size_t tmppos =
pack(key, msg->bytes, pos);
2927 msg->tt_id.num_keys = 1;
2930 parsec_taskpool_t *tp = world_impl.taskpool();
2931 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2932 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2934 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
2936 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2937 if(world.impl().profiling()) {
2938 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2941 #if defined(PARSEC_PROF_GRAPHER)
2946 if(orig_index >= 0) {
2947 snprintf(orig_str, 32,
"%d", orig_index);
2949 strncpy(orig_str,
"_", 32);
2951 snprintf(dest_str, 32,
"%lu", i);
2952 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2953 .flow_index = 0, .flow_datatype_mask = ~0 };
2954 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2955 .flow_index = 0, .flow_datatype_mask = ~0 };
2963 template <
int i,
typename Iterator,
typename Value>
2965 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2966 if(world.impl().profiling()) {
2967 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2970 parsec_task_t *task_ring =
nullptr;
2976 for (
auto it = begin; it != end; ++it) {
2977 set_arg_local_impl<i>(*it, value, copy, &task_ring);
2980 if (
nullptr != task_ring) {
2981 parsec_task_t *vp_task_ring[1] = { task_ring };
2982 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2984 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2985 if(world.impl().profiling()) {
2986 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2991 template <std::
size_t i,
typename Key,
typename Value>
2992 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2995 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2997 auto np = world.size();
2998 int rank = world.rank();
3000 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3001 [&](
const Key &key) { return keymap(key) != rank; });
3004 using decvalueT = std::decay_t<Value>;
3007 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
3008 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](
const Key &a,
const Key &b)
mutable {
3009 int rank_a = keymap(a);
3010 int rank_b = keymap(b);
3012 int pos_a = (rank_a + np - rank) % np;
3013 int pos_b = (rank_b + np - rank) % np;
3014 return pos_a < pos_b;
3018 auto local_begin = keylist_sorted.end();
3019 auto local_end = keylist_sorted.end();
3021 int32_t num_iovs = 0;
3025 assert(
nullptr != copy);
3028 auto &world_impl = world.impl();
3029 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3034 bool inline_data =
can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3035 msg->tt_id.inline_data = inline_data;
3037 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3038 auto write_iov_header = [&](){
3043 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
3044 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
3045 pos +=
sizeof(cbtag);
3048 auto handle_iov_fn = [&](
auto&& iovec){
3051 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
3052 pos += iovec.num_bytes;
3054 parsec_ce_mem_reg_handle_t lreg;
3056 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
3057 iovec.num_bytes, &lreg, &lreg_size);
3059 memregs.push_back(std::make_pair(
static_cast<int32_t
>(lreg_size),
3061 std::shared_ptr<void>{lreg, [](
void *
ptr) {
3062 parsec_ce_mem_reg_handle_t memreg =
3063 (parsec_ce_mem_reg_handle_t)
ptr;
3065 parsec_ce.mem_unregister(&memreg);
3074 auto metadata = descr.get_metadata(value);
3075 pos =
pack(metadata, msg->bytes, pos);
3076 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(&value));
3077 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3078 memregs.reserve(num_iovs);
3080 for (
auto &&iov : iovs) {
3086 pos =
pack(value, msg->bytes, pos, copy);
3088 memregs.reserve(num_iovs);
3092 data->device_copies[
data->owner_device]->device_private});
3096 msg->tt_id.num_iovecs = num_iovs;
3098 std::size_t save_pos = pos;
3100 parsec_taskpool_t *tp = world_impl.taskpool();
3101 for (
auto it = keylist_sorted.begin(); it < keylist_sorted.end(); ) {
3103 auto owner = keymap(*it);
3104 if (owner ==
rank) {
3108 std::find_if_not(++it, keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3121 for (
int idx = 0; idx < num_iovs; ++idx) {
3124 std::shared_ptr<void> lreg_ptr;
3125 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3126 std::memcpy(msg->bytes + pos, &lreg_size,
sizeof(lreg_size));
3127 pos +=
sizeof(lreg_size);
3128 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3132 copy = detail::register_data_copy<valueT>(copy,
nullptr,
true);
3134 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3140 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3141 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
3142 pos +=
sizeof(fn_ptr);
3147 msg->tt_id.key_offset = pos;
3153 pos =
pack(*it, msg->bytes, pos);
3155 }
while (it < keylist_sorted.end() && keymap(*it) == owner);
3156 msg->tt_id.num_keys = num_keys;
3158 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3159 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3161 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3165 broadcast_arg_local<i>(local_begin, local_end, value);
3168 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3175 template <
typename Key,
typename... Ts,
size_t... Is,
size_t... Js>
3176 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3177 std::index_sequence<Js...>,
const Key &key,
3178 const std::tuple<Ts...> &args) {
3179 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3180 constexpr
size_t js[] = {Js...};
3181 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3187 template <
typename Key,
typename... Ts,
size_t... Is>
3188 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
const Key &key,
3189 const std::tuple<Ts...> &args) {
3190 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3196 template <
typename Key = keyT,
typename... Ts,
size_t... Is,
size_t... Js>
3197 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3198 const std::tuple<Ts...> &args) {
3199 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3200 constexpr
size_t js[] = {Js...};
3201 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3207 template <
typename Key = keyT,
typename... Ts,
size_t... Is>
3208 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
3209 const std::tuple<Ts...> &args) {
3210 set_args(std::index_sequence_for<Ts...>{}, is, args);
3216 template <std::
size_t i>
3218 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3219 assert(
size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3221 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ", i);
3224 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3226 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3229 static_stream_goal[i] =
size;
3235 template <std::
size_t i,
typename Key>
3238 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3239 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3242 const auto owner = keymap(key);
3243 if (owner != world.rank()) {
3244 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ", i);
3246 auto &world_impl = world.impl();
3248 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3250 world_impl.rank(), 1);
3252 pos =
pack(key, msg->bytes, pos);
3254 parsec_taskpool_t *tp = world_impl.taskpool();
3255 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3256 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3257 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3260 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ",
size,
" for terminal ", i);
3262 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3265 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3267 world.impl().increment_created();
3269 if( world.impl().dag_profiling() ) {
3270 #if defined(PARSEC_PROF_GRAPHER)
3271 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3275 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3285 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3287 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3296 template <std::
size_t i,
typename Key = keyT>
3299 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3300 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3303 const auto owner = keymap();
3304 if (owner != world.rank()) {
3305 ttg::trace(world.rank(),
":",
get_name(),
" : forwarding stream size for terminal ", i);
3307 auto &world_impl = world.impl();
3309 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3311 world_impl.rank(), 0);
3313 parsec_taskpool_t *tp = world_impl.taskpool();
3314 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3315 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3316 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3321 parsec_key_t hk = 0;
3324 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3326 world.impl().increment_created();
3328 if( world.impl().dag_profiling() ) {
3329 #if defined(PARSEC_PROF_GRAPHER)
3330 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3334 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3344 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3346 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3354 template <std::
size_t i,
typename Key>
3357 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3360 const auto owner = keymap(key);
3361 if (owner != world.rank()) {
3362 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ", i);
3364 auto &world_impl = world.impl();
3366 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3368 world_impl.rank(), 1);
3370 pos =
pack(key, msg->bytes, pos);
3371 parsec_taskpool_t *tp = world_impl.taskpool();
3372 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3373 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3374 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3377 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ", i);
3379 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3384 " : error finalize called on stream that never received an input data: ", i);
3385 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3396 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3398 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3399 if (1 == c && (task->
streams[i].size >= 1)) {
3406 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3409 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3412 const auto owner = keymap();
3413 if (owner != world.rank()) {
3414 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ", i);
3416 auto &world_impl = world.impl();
3418 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3420 world_impl.rank(), 0);
3421 parsec_taskpool_t *tp = world_impl.taskpool();
3422 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3423 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3424 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3429 auto hk =
static_cast<parsec_key_t
>(0);
3433 " : error finalize called on stream that never received an input data: ", i);
3434 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3445 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3447 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3448 if (1 == c && (task->
streams[i].size >= 1)) {
3454 template<
typename Value>
3459 auto check_parsec_data = [&](parsec_data_t*
data) {
3460 if (
data->owner_device != 0) {
3463 while (flowidx < MAX_PARAM_COUNT &&
3464 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3471 if (flowidx == MAX_PARAM_COUNT) {
3472 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3474 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3477 gpu_task->flow_nb_elts[flowidx] =
data->nb_elts;
3480 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3481 gpu_task->pushout |= 1<<flowidx;
3485 [&](parsec_data_t*
data){
3486 check_parsec_data(
data);
3492 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3493 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3496 constexpr
const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3503 if (
nullptr == copy) {
3508 bool need_pushout =
false;
3516 auto &reducer = std::get<i>(input_reducers);
3524 if constexpr (value_is_const) {
3543 need_pushout =
true;
3550 need_pushout =
true;
3554 need_pushout =
true;
3561 need_pushout =
true;
3565 if (!need_pushout) {
3566 bool device_supported =
false;
3576 if (!device_supported) {
3577 need_pushout = remote_check();
3588 template <std::
size_t i,
typename Key,
typename Value>
3589 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3592 auto remote_check = [&](){
3594 int rank = world.rank();
3595 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3596 [&](
const Key &key) { return keymap(key) != rank; });
3599 do_prepare_send<i>(value, remote_check);
3602 template <std::
size_t i,
typename Key,
typename Value>
3603 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3606 auto remote_check = [&](){
3608 int rank = world.rank();
3609 return (keymap() !=
rank);
3611 do_prepare_send<i>(value, remote_check);
3624 TT(
const TT &other) =
delete;
3625 TT &operator=(
const TT &other) =
delete;
3626 TT(
TT &&other) =
delete;
3627 TT &operator=(
TT &&other) =
delete;
3630 template <
typename terminalT, std::
size_t i>
3631 void register_input_callback(terminalT &input) {
3632 using valueT = std::decay_t<typename terminalT::value_type>;
3633 if (input.is_pull_terminal) {
3639 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3640 auto move_callback = [
this](
const keyT &key, valueT &&value) {
3641 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3643 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3644 set_arg<i, keyT, const valueT &>(key, value);
3646 auto broadcast_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3647 broadcast_arg<i, keyT, valueT>(keylist, value);
3649 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3650 prepare_send<i, keyT, valueT>(keylist, value);
3652 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3653 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3654 input.set_callback(send_callback, move_callback, broadcast_callback,
3655 setsize_callback, finalize_callback, prepare_send_callback);
3660 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3661 auto send_callback = [
this](
const keyT &key) { set_arg<i, keyT, ttg::Void>(key,
ttg::Void{}); };
3662 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3663 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3664 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3673 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3674 auto move_callback = [
this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3675 auto send_callback = [
this](
const valueT &value) {
3676 if constexpr (std::is_copy_constructible_v<valueT>) {
3677 set_arg<i, keyT, const valueT &>(value);
3680 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") +
typeid(std::decay_t<valueT>).name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3683 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3684 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3685 auto prepare_send_callback = [
this](
const valueT &value) {
3686 prepare_send<i, void>(value);
3688 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3693 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3694 auto send_callback = [
this]() { set_arg<i, keyT, ttg::Void>(
ttg::Void{}); };
3695 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3696 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3697 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3707 template <std::size_t... IS>
3708 void register_input_callbacks(std::index_sequence<IS...>) {
3711 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3716 template <std::size_t... IS,
typename inedgesT>
3717 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3718 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3722 template <std::size_t... IS,
typename outedgesT>
3723 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3724 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3729 template <
typename input_terminals_tupleT, std::size_t... IS,
typename flowsT>
3730 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3732 (*(
const_cast<std::remove_const_t<decltype(flows[IS]-
>flow_flags)> *>(&(flows[IS]->flow_flags))) =
3733 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3734 : PARSEC_FLOW_ACCESS_RW),
3739 template <
typename input_terminals_tupleT,
typename flowsT>
3740 void initialize_flows(flowsT &&flows) {
3741 _initialize_flows<input_terminals_tupleT>(
3748 static int key_equal(parsec_key_t a, parsec_key_t b,
void *user_data) {
3749 if constexpr (std::is_same_v<keyT, void>) {
3752 keyT &ka = *(
reinterpret_cast<keyT *
>(a));
3753 keyT &kb = *(
reinterpret_cast<keyT *
>(b));
3758 static uint64_t key_hash(parsec_key_t k,
void *user_data) {
3759 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3760 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3763 keyT &kk = *(
reinterpret_cast<keyT *
>(k));
3765 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3770 static char *key_print(
char *buffer,
size_t buffer_size, parsec_key_t k,
void *user_data) {
3771 if constexpr (std::is_same_v<keyT, void>) {
3775 keyT kk = *(
reinterpret_cast<keyT *
>(k));
3776 std::stringstream iss;
3778 memset(buffer, 0, buffer_size);
3779 iss.get(buffer, buffer_size);
3784 static parsec_key_t make_key(
const parsec_taskpool_t *tp,
const parsec_assignment_t *as) {
3786 keyT *key = *(keyT**)&(as[2]);
3787 return reinterpret_cast<parsec_key_t
>(key);
3790 static char *parsec_ttg_task_snprintf(
char *buffer,
size_t buffer_size,
const parsec_task_t *parsec_task) {
3791 if(buffer_size == 0)
3794 if constexpr (ttg::meta::is_void_v<keyT>) {
3795 snprintf(buffer, buffer_size,
"%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3797 const task_t *task =
reinterpret_cast<const task_t*
>(parsec_task);
3798 std::stringstream ss;
3801 std::string keystr = ss.str();
3802 std::replace(keystr.begin(), keystr.end(),
'(',
':');
3803 std::replace(keystr.begin(), keystr.end(),
')',
':');
3805 snprintf(buffer, buffer_size,
"%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3810 #if defined(PARSEC_PROF_TRACE)
3811 static void *parsec_ttg_task_info(
void *dst,
const void *
data,
size_t size)
3813 const task_t *task =
reinterpret_cast<const task_t *
>(
data);
3815 if constexpr (ttg::meta::is_void_v<keyT>) {
3816 snprintf(
reinterpret_cast<char*
>(dst),
size,
"()");
3818 std::stringstream ss;
3820 snprintf(
reinterpret_cast<char*
>(dst),
size,
"%s", ss.str().c_str());
3826 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3828 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3832 task_t *task = (task_t*)parsec_task;
3834 #ifdef TTG_HAVE_COROUTINE
3836 if (task->suspended_task_address) {
3838 #ifdef TTG_HAVE_DEVICE
3841 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3844 auto dev_data = dev_task.promise();
3846 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3847 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3850 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3853 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3854 task->tt->set_outputs_tls_ptr();
3855 dev_data.do_sends();
3856 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3863 task->suspended_task_address =
nullptr;
3868 for (
int i = 0; i < task->data_count; i++) {
3869 detail::ttg_data_copy_t *copy = task->
copies[i];
3870 if (
nullptr == copy)
continue;
3872 task->copies[i] =
nullptr;
3875 for (
auto& c : task->tt->constraints_complete) {
3876 if constexpr(std::is_void_v<keyT>) {
3882 return PARSEC_HOOK_RETURN_DONE;
3886 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3887 typename priomapT = ttg::detail::default_priomap<keyT>>
3888 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3889 ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3890 :
ttg::
TTBase(name, numinedges, numouts)
3893 , keymap(std::is_same<keymapT,
ttg::detail::default_keymap<keyT>>::value
3894 ? decltype(keymap)(
ttg::detail::default_keymap<keyT>(world))
3895 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3896 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3898 if (innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3899 if (outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3901 auto &world_impl = world.
impl();
3902 world_impl.register_op(
this);
3904 if constexpr (numinedges == numins) {
3912 register_input_callbacks(std::make_index_sequence<numinedges>{});
3915 memset(&
self, 0,
sizeof(parsec_task_class_t));
3917 self.name = strdup(
get_name().c_str());
3919 self.nb_parameters = 0;
3922 self.nb_flows = MAX_PARAM_COUNT;
3925 if( world_impl.profiling() ) {
3927 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3929 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3940 self.make_key = make_key;
3941 self.key_functions = &tasks_hash_fcts;
3942 self.task_snprintf = parsec_ttg_task_snprintf;
3944 #if defined(PARSEC_PROF_TRACE)
3945 self.profile_info = &parsec_ttg_task_info;
3948 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
3953 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3954 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
3955 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3956 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3957 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3958 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3959 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3961 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3962 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_HIP;
3963 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3964 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_hip<TT>;
3966 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3967 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3968 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3969 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3971 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3972 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3973 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3974 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3976 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3977 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3978 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3981 self.incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
3982 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CPU;
3983 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
3984 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook<TT>;
3985 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3986 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3987 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3991 self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3992 self.complete_execution = complete_task_and_release;
3994 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3995 parsec_flow_t *flow =
new parsec_flow_t;
3996 flow->name = strdup((std::string(
"flow in") + std::to_string(i)).c_str());
3997 flow->sym_type = PARSEC_SYM_INOUT;
4000 flow->dep_in[0] = NULL;
4001 flow->dep_out[0] = NULL;
4002 flow->flow_index = i;
4003 flow->flow_datatype_mask = ~0;
4004 *((parsec_flow_t **)&(
self.
in[i])) = flow;
4009 for (i = 0; i < MAX_PARAM_COUNT; i++) {
4010 parsec_flow_t *flow =
new parsec_flow_t;
4011 flow->name = strdup((std::string(
"flow out") + std::to_string(i)).c_str());
4012 flow->sym_type = PARSEC_SYM_INOUT;
4013 flow->flow_flags = PARSEC_FLOW_ACCESS_READ;
4014 flow->dep_in[0] = NULL;
4015 flow->dep_out[0] = NULL;
4016 flow->flow_index = i;
4017 flow->flow_datatype_mask = (1 << i);
4018 *((parsec_flow_t **)&(
self.
out[i])) = flow;
4023 self.dependencies_goal = numins;
4026 auto *context = world_impl.context();
4027 for (
int i = 0; i < context->nb_vp; i++) {
4028 nbthreads += context->virtual_processes[i]->nb_cores;
4031 parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t),
sizeof(
task_t),
4032 offsetof(parsec_task_t, mempool_owner), nbthreads);
4041 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4042 typename priomapT = ttg::detail::default_priomap<keyT>>
4043 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4046 std::forward<priomapT>(priomap)) {}
4048 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4049 typename priomapT = ttg::detail::default_priomap<keyT>>
4051 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
ttg::World world,
4052 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4053 :
TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4054 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4055 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4057 if constexpr (numinedges > 0) {
4058 register_input_callbacks(std::make_index_sequence<numinedges>{});
4061 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4062 typename priomapT = ttg::detail::default_priomap<keyT>>
4064 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4067 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4071 if(
nullptr !=
self.name ) {
4072 free((
void*)
self.name);
4073 self.name =
nullptr;
4076 for (std::size_t i = 0; i < numins; ++i) {
4077 if (inpute_reducers_taskclass[i] !=
nullptr) {
4078 std::free(inpute_reducers_taskclass[i]);
4079 inpute_reducers_taskclass[i] =
nullptr;
4087 ttT *op = (
ttT *)cb_data;
4088 if constexpr (!ttg::meta::is_void_v<keyT>) {
4089 std::cout <<
"Left over task " << op->
get_name() <<
" " << task->
key << std::endl;
4091 std::cout <<
"Left over task " << op->
get_name() << std::endl;
4109 parsec_mempool_destruct(&mempools);
4112 free((__parsec_chore_t *)
self.incarnations);
4113 for (
int i = 0; i < MAX_PARAM_COUNT; i++) {
4114 if (NULL !=
self.
in[i]) {
4115 free(
self.
in[i]->name);
4117 self.in[i] =
nullptr;
4119 if (NULL !=
self.
out[i]) {
4120 free(
self.
out[i]->name);
4122 self.out[i] =
nullptr;
4125 world.
impl().deregister_op(
this);
4135 template <std::
size_t i,
typename Reducer>
4138 std::get<i>(input_reducers) = reducer;
4140 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4141 if (
nullptr == tc) {
4142 tc = (parsec_task_class_t *)std::calloc(1,
sizeof(*tc));
4143 inpute_reducers_taskclass[i] = tc;
4145 tc->name = strdup((
get_name() + std::string(
" reducer ") + std::to_string(i)).c_str());
4147 tc->nb_parameters = 0;
4149 tc->nb_flows = numflows;
4151 auto &world_impl = world.
impl();
4153 if( world_impl.profiling() ) {
4155 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4157 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4168 tc->make_key = make_key;
4169 tc->key_functions = &tasks_hash_fcts;
4170 tc->task_snprintf = parsec_ttg_task_snprintf;
4172 #if defined(PARSEC_PROF_TRACE)
4173 tc->profile_info = &parsec_ttg_task_info;
4176 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
4181 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
4182 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
4183 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
4185 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_CPU;
4186 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
4187 ((__parsec_chore_t *)
self.incarnations)[1].hook =
detail::hook;
4188 ((__parsec_chore_t *)
self.incarnations)[2].type = PARSEC_DEV_NONE;
4189 ((__parsec_chore_t *)
self.incarnations)[2].evaluate = NULL;
4190 ((__parsec_chore_t *)
self.incarnations)[2].hook = NULL;
4194 tc->incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
4195 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4196 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4197 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4198 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4199 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4200 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4204 tc->release_task = &parsec_release_task_to_mempool;
4205 tc->complete_execution = NULL;
4216 template <std::
size_t i,
typename Reducer>
4218 set_input_reducer<i>(std::forward<Reducer>(reducer));
4219 set_static_argstream_size<i>(
size);
4224 template <std::
size_t i>
4225 std::tuple_element_t<i, input_terminals_type> *
in() {
4226 return &std::get<i>(input_terminals);
4231 template <std::
size_t i>
4232 std::tuple_element_t<i, output_terminalsT> *
out() {
4233 return &std::get<i>(output_terminals);
4237 template <
typename Key = keyT>
4238 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4241 if constexpr(!std::is_same_v<Key, key_type>) {
4246 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4248 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4249 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4254 template <
typename Key = keyT>
4255 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4259 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4261 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4262 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4266 template <
typename Key = keyT>
4267 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4271 if constexpr(!std::is_same_v<Key, key_type>) {
4276 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4277 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4282 template <
typename Key = keyT>
4283 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke() {
4286 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4287 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4292 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4299 template<
typename Key,
typename Arg,
typename... Args, std::size_t I, std::size_t... Is>
4300 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key, Arg&& arg, Args&&... args) {
4301 using arg_type = std::decay_t<Arg>;
4302 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4307 copy->reset_readers();
4309 set_arg_impl<I>(key, val, copy);
4311 if constexpr (std::is_rvalue_reference_v<Arg>) {
4315 }
else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4316 set_arg<I>(key, std::forward<Arg>(arg));
4318 if constexpr (
sizeof...(Is) > 0) {
4320 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4326 template <
typename Key = keyT,
typename Arg,
typename... Args>
4327 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4328 const Key &key, Arg&& arg, Args&&... args) {
4329 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4330 "Number of arguments to invoke must match the number of task inputs.");
4333 invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4334 std::forward<Arg>(arg), std::forward<Args>(args)...);
4337 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4338 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4342 m_defer_writer = value;
4346 return m_defer_writer;
4351 world.
impl().register_tt_profiling(
this);
4361 template <
typename Keymap>
4372 template <
typename Priomap>
4374 priomap = std::forward<Priomap>(pm);
4382 template<
typename Devicemap>
4387 devicemap = std::forward<Devicemap>(dm);
4390 devicemap = [=](
const keyT& key) {
4398 throw std::runtime_error(
"Unknown device type!");
4411 template<
typename Constra
int>
4413 std::size_t cid = constraints_check.size();
4414 if constexpr(ttg::meta::is_void_v<keyT>) {
4416 constraints_check.push_back([c,
this](){
return c->check(
this); });
4417 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(
this);
return true; });
4419 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4420 constraints_check.push_back([c,
this](
const keyT& key){
return c->check(key,
this); });
4421 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(key,
this);
return true; });
4427 template<
typename Constra
int>
4430 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4436 template<
typename Constra
int,
typename Mapper>
4438 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4439 std::size_t cid = constraints_check.size();
4440 if constexpr(ttg::meta::is_void_v<keyT>) {
4442 constraints_check.push_back([map, c,
this](){
return c->check(map(),
this); });
4443 constraints_complete.push_back([map, c,
this](){ c->complete(map(),
this);
return true; });
4445 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4446 constraints_check.push_back([map, c,
this](
const keyT& key){
return c->check(key, map(key),
this); });
4447 constraints_complete.push_back([map, c,
this](
const keyT& key){ c->complete(key, map(key),
this);
return true; });
4454 template<
typename Constra
int,
typename Mapper>
4457 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4463 MPI_Comm_rank(MPI_COMM_WORLD, &
rank);
4466 auto &world_impl = world.
impl();
4470 auto tp = world_impl.taskpool();
4476 std::vector<static_set_arg_fct_arg_t> tmp;
4477 for (
auto it = se.first; it != se.second;) {
4479 tmp.push_back(it->second);
4484 for (
auto it : tmp) {
4487 std::get<1>(it),
", ", std::get<2>(it),
")");
4488 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4489 std::get<0>(it), NULL);
4491 free(std::get<1>(it));
4514 bool do_release =
true;
4520 : copy_to_remove(h.copy_to_remove)
4522 h.copy_to_remove =
nullptr;
4528 std::swap(copy_to_remove, h.copy_to_remove);
4533 if (
nullptr != copy_to_remove) {
4541 template <
typename Value>
4542 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4543 constexpr
auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4544 using value_type = std::remove_reference_t<Value>;
4545 static_assert(value_is_rvref ||
4546 std::is_copy_constructible_v<std::decay_t<Value>>,
4547 "Data sent without being moved must be copy-constructible!");
4550 if (
nullptr == caller) {
4551 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4556 value_type *value_ptr = &value;
4557 if (
nullptr == copy) {
4565 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4566 copy_to_remove = copy;
4568 if constexpr (value_is_rvref) {
4576 if constexpr (value_is_rvref)
4577 return std::move(*value_ptr);
4582 template<
typename Value>
4585 if (
nullptr == caller) {
4586 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4590 if (
nullptr == copy) {
4595 copy_to_remove = copy;
4607 template <
typename Value>
4610 if (
nullptr == caller) {
4611 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4615 const Value *value_ptr = &value;
4616 if (
nullptr == copy) {
4624 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4625 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
const std::string & get_name() const
Gets the name of this operation.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
void print_incomplete_tasks()
void add_constraint(Constraint c, Mapper &&map)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::World get_world() const override final
static constexpr int numinvals
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
void set_priomap(Priomap &&pm)
void set_input_reducer(Reducer &&reducer)
std::tuple_element_t< i, input_terminals_type > * in()
void set_devicemap(Devicemap &&dm)
void set_keymap(Keymap &&km)
keymap setter
void add_constraint(std::shared_ptr< Constraint > c)
void finalize_argstream_from_msg(void *data, std::size_t size)
void set_input_reducer(Reducer &&reducer, std::size_t size)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
bool check_constraints(task_t *task)
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
task_t * create_new_task(const Key &key)
output_terminalsT output_terminals_type
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
static auto & get(InTuple &&intuple)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
void set_defer_writer(bool value)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
virtual void release() override
bool get_defer_writer(bool value)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
static void ht_iter_cb(void *item, void *cb_data)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
parsec_thread_mempool_t * get_task_mempool(void)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
const auto & get_output_terminals() const
static constexpr bool derived_has_device_op()
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
decltype(keymap) const & get_keymap() const
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
void set_static_argstream_size(std::size_t size)
static constexpr const ttg::Runtime runtime
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
static resultT get(InTuple &&intuple)
static constexpr bool derived_has_cuda_op()
void register_static_op_function(void)
void add_constraint(Constraint &&c)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
void argstream_set_size_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
void set_arg_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
void make_executable() override
Marks this executable.
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
decltype(priomap) const & get_priomap() const
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
std::tuple_element_t< i, output_terminalsT > * out()
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
static constexpr bool derived_has_hip_op()
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
void copy_mark_pushout(const Value &value)
void get_from_pull_msg(void *data, std::size_t size)
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
static constexpr bool derived_has_level_zero_op()
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
actual_input_tuple_type input_args_type
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
auto * execution_stream()
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
bool complete_transfer(void)
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
void set_current(int device, Stream stream)
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
bool all_devices_peer_access
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
std::size_t max_inline_size
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
void foreach_parsec_data(Value &&value, Fn &&fn)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
const parsec_symbol_t parsec_taskclass_param1
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
void transfer_ownership_impl(T &&arg, int device)
const parsec_symbol_t parsec_taskclass_param2
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * create_new_datacopy(Value &&value)
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
void release_data_copy(ttg_data_copy_t *copy)
const parsec_symbol_t parsec_taskclass_param3
const parsec_symbol_t parsec_taskclass_param0
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
int rank(World world=default_execution_context())
ttg::World & get_default_world()
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(value_copy_handler &&h)
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
value_copy_handler & operator=(const value_copy_handler &h)=delete
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t task_constraint_table
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
ttg_parsec_data_flags data_flags
parsec_hash_table_item_t tt_ht_item
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
static void release_task(parsec_ttg_task_base_t *task_base)
static constexpr int mutable_tag
parsec_task_t * get_next_task() const
virtual void * get_ptr()=0
void set_next_task(parsec_task_t *task)
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER