2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
13 #define TTG_PARSEC_DEFER_WRITER false
15 #include "ttg/config.h"
20 #include "../../ttg.h"
56 #include <experimental/type_traits>
71 #if defined(TTG_HAVE_MPI)
73 #if defined(TTG_HAVE_MPIEXT)
80 #include <parsec/class/parsec_hash_table.h>
81 #include <parsec/data_internal.h>
82 #include <parsec/execution_stream.h>
83 #include <parsec/interfaces/interface.h>
84 #include <parsec/mca/device/device.h>
85 #include <parsec/parsec_comm_engine.h>
86 #include <parsec/parsec_internal.h>
87 #include <parsec/scheduling.h>
88 #include <parsec/remote_dep.h>
90 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
91 #include <parsec/mca/device/cuda/device_cuda.h>
93 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
94 #include <parsec/mca/device/hip/device_hip.h>
96 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
97 #include <parsec/mca/device/level_zero/device_level_zero.h>
100 #include <parsec/mca/device/device_gpu.h>
101 #if defined(PARSEC_PROF_TRACE)
102 #include <parsec/profiling.h>
103 #undef PARSEC_TTG_PROFILE_BACKEND
104 #if defined(PARSEC_PROF_GRAPHER)
105 #include <parsec/parsec_prof_grapher.h>
111 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
112 #include <unordered_set>
123 #include <boost/type_index.hpp>
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
149 uint64_t
op_id = std::numeric_limits<uint64_t>::max();
170 static void unregister_parsec_tags(
void *_);
183 uint32_t taskpool_id,
188 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
194 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *
data,
long unsigned int size,
195 int src_rank,
void *obj) {
197 parsec_taskpool_t *tp = NULL;
199 uint64_t op_id = msg->
op_id;
206 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207 static_set_arg_fct = op_pair.first;
208 static_set_arg_fct(
data,
size, op_pair.second);
209 tp->tdm.module->incoming_message_end(tp, NULL);
211 }
catch (
const std::out_of_range &e) {
212 void *data_cpy = malloc(
size);
213 assert(data_cpy != 0);
216 ", ", op_id,
", ", data_cpy,
", ",
size,
")");
223 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
224 int src,
void *cb_data);
227 static bool im =
false;
238 bool _task_profiling;
240 mpi_space_support = {
true,
false,
false};
242 int query_comm_size() {
244 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
248 int query_comm_rank() {
250 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
254 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
260 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
267 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
268 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
269 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
270 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
273 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
276 , own_ctx(c == nullptr)
277 #if defined(PARSEC_PROF_TRACE)
278 , profiling_array(nullptr)
279 , profiling_array_size(0)
281 , _dag_profiling(false)
282 , _task_profiling(false)
285 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
289 #
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
290 || MPIX_Query_cuda_support()
297 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
298 || MPIX_Query_hip_support()
304 #if defined(PARSEC_PROF_TRACE)
305 if(parsec_profile_enabled) {
307 #if defined(PARSEC_TTG_PROFILE_BACKEND)
308 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
309 (
int*)&parsec_ttg_profile_backend_set_arg_start,
310 (
int*)&parsec_ttg_profile_backend_set_arg_end);
311 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
312 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
313 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
314 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
315 sizeof(
size_t),
"size{int64_t}",
316 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
317 (
int*)&parsec_ttg_profile_backend_free_datacopy);
322 if( NULL != parsec_ce.tag_register) {
336 assert(
nullptr == tpool);
337 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
338 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
340 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
341 tpool->taskpool_name = strdup(
"TTG Taskpool");
342 parsec_taskpool_reserve_id(tpool);
344 tpool->devices_index_mask = 0;
345 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
346 parsec_device_module_t *device = parsec_mca_device_get(i);
347 if( NULL == device )
continue;
348 tpool->devices_index_mask |= (1 << device->device_index);
351 #ifdef TTG_USE_USER_TERMDET
352 parsec_termdet_open_module(tpool,
"user_trigger");
354 parsec_termdet_open_dyn_module(tpool);
362 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
365 #if defined(PARSEC_PROF_TRACE)
366 tpool->profiling_array = profiling_array;
375 parsec_taskpool_started =
false;
395 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
398 if (!parsec_taskpool_started) {
399 parsec_enqueue(ctx, tpool);
400 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
401 tpool->tdm.module->taskpool_ready(tpool);
402 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
404 parsec_taskpool_started =
true;
409 #if defined(PARSEC_PROF_TRACE)
413 tpool->profiling_array =
nullptr;
415 assert(NULL != tpool->tdm.monitor);
416 tpool->tdm.module->unmonitor_taskpool(tpool);
417 parsec_taskpool_free(tpool);
423 if (parsec_taskpool_started) {
425 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
426 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
428 parsec_context_wait(ctx);
430 parsec_taskpool_wait(tpool);
436 unregister_parsec_tags(
nullptr);
438 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
440 #if defined(PARSEC_PROF_TRACE)
441 if(
nullptr != profiling_array) {
442 free(profiling_array);
443 profiling_array =
nullptr;
444 profiling_array_size = 0;
447 if (own_ctx) parsec_fini(&ctx);
463 virtual void dag_on(
const std::string &filename)
override {
464 #if defined(PARSEC_PROF_GRAPHER)
465 if(!_dag_profiling) {
467 size_t len = strlen(filename.c_str())+32;
468 char ext_filename[len];
469 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
470 parsec_prof_grapher_init(ctx, ext_filename);
471 _dag_profiling =
true;
474 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
475 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
480 #if defined(PARSEC_PROF_GRAPHER)
482 parsec_prof_grapher_fini();
483 _dag_profiling =
false;
489 #if defined(PARSEC_PROF_TRACE)
490 _task_profiling =
false;
495 #if defined(PARSEC_PROF_TRACE)
496 _task_profiling =
true;
500 virtual bool profiling()
override {
return _task_profiling; }
503 return mpi_space_support[
static_cast<std::size_t
>(space)];
507 #ifdef TTG_USE_USER_TERMDET
508 if(parsec_taskpool_started) {
510 parsec_taskpool_started =
false;
515 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
518 #if defined(PARSEC_PROF_TRACE)
519 std::stringstream ss;
520 build_composite_name_rec(t->
ttg_ptr(), ss);
527 #if defined(PARSEC_PROF_TRACE)
528 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
531 build_composite_name_rec(t->
ttg_ptr(), ss);
535 void register_new_profiling_event(
const char *name,
int position) {
536 if(2*position >= profiling_array_size) {
537 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
538 profiling_array = (
int*)realloc((
void*)profiling_array,
539 new_profiling_array_size *
sizeof(int));
540 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
541 profiling_array_size = new_profiling_array_size;
542 tpool->profiling_array = profiling_array;
545 assert(0 == tpool->profiling_array[2*position]);
546 assert(0 == tpool->profiling_array[2*position+1]);
550 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
551 (
int*)&tpool->profiling_array[2*position],
552 (
int*)&tpool->profiling_array[2*position+1]);
558 if (!parsec_taskpool_started) {
559 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
563 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
565 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
567 parsec_taskpool_wait(tpool);
580 parsec_context_t *ctx =
nullptr;
581 bool own_ctx =
false;
582 parsec_taskpool_t *tpool =
nullptr;
583 bool parsec_taskpool_started =
false;
584 #if defined(PARSEC_PROF_TRACE)
585 int *profiling_array;
586 std::size_t profiling_array_size;
590 static void unregister_parsec_tags(
void *_pidx)
592 if(NULL != parsec_ce.tag_unregister) {
601 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
609 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
617 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
625 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
635 if (task ==
nullptr ||
ptr ==
nullptr) {
640 if (NULL != copy && copy->get_ptr() ==
ptr) {
650 if (task ==
nullptr ||
ptr ==
nullptr) {
655 if (NULL != copy && copy->get_ptr() ==
ptr) {
663 if (task ==
nullptr || copy ==
nullptr) {
667 if (MAX_PARAM_COUNT < task->data_count) {
668 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
680 if (copy == task->
copies[i]) {
690 task->
copies[i] =
nullptr;
694 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
695 #warning "ttg::PaRSEC enables data copy tracking"
696 static std::unordered_set<ttg_data_copy_t *> pending_copies;
697 static std::mutex pending_copies_mutex;
699 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
700 static int64_t parsec_ttg_data_copy_uid = 0;
703 template <
typename Value>
705 using value_type = std::decay_t<Value>;
708 std::is_constructible_v<value_type, decltype(value)>) {
709 copy =
new value_type(std::forward<Value>(value));
714 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
716 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
719 copy->size =
sizeof(Value);
720 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
722 static_cast<uint64_t
>(copy->uid),
723 PROFILE_OBJECT_ID_NULL, ©->size,
724 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
727 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
729 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
730 auto rc = pending_copies.insert(copy);
731 assert(std::get<1>(rc));
738 template <std::size_t... IS,
typename Key = keyT>
740 int junk[] = {0, (invoke_pull_terminal<IS>(
741 std::get<IS>(input_terminals), key, task),
747 template<
typename TT, std::
size_t I>
749 if constexpr(!
std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
755 template<
typename TT, std::size_t... Is>
758 int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->
copies[Is], device), 0)...};
762 template<
typename TT>
763 inline parsec_hook_return_t
hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
765 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
768 return me->template invoke_op<ttg::ExecutionSpace::Host>();
771 template<
typename TT>
772 inline parsec_hook_return_t
hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
775 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
777 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
778 return PARSEC_HOOK_RETURN_ERROR;
782 template<
typename TT>
783 inline parsec_hook_return_t
hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
786 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
788 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
789 return PARSEC_HOOK_RETURN_ERROR;
793 template<
typename TT>
794 inline parsec_hook_return_t
hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
797 return me->template invoke_op<ttg::ExecutionSpace::L0>();
799 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
800 return PARSEC_HOOK_RETURN_ERROR;
805 template<
typename TT>
806 inline parsec_hook_return_t
evaluate_cuda(
const parsec_task_t *parsec_task) {
809 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
811 return PARSEC_HOOK_RETURN_NEXT;
815 template<
typename TT>
816 inline parsec_hook_return_t
evaluate_hip(
const parsec_task_t *parsec_task) {
819 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
821 return PARSEC_HOOK_RETURN_NEXT;
825 template<
typename TT>
829 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
831 return PARSEC_HOOK_RETURN_NEXT;
836 template <
typename KeyT,
typename ActivationCallbackT>
838 std::vector<KeyT> _keylist;
839 std::atomic<int> _outstanding_transfers;
840 ActivationCallbackT _cb;
845 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
848 int left = --_outstanding_transfers;
850 _cb(std::move(_keylist), _copy);
857 template <
typename ActivationT>
858 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
859 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
861 parsec_ce.mem_unregister(&lreg);
862 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
863 if (activation->complete_transfer()) {
866 return PARSEC_SUCCESS;
869 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
870 int src,
void *cb_data) {
871 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
872 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
875 return PARSEC_SUCCESS;
878 template <
typename FuncT>
879 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
880 int src,
void *cb_data) {
881 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
882 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
885 return PARSEC_SUCCESS;
899 }
else if (readers == 1) {
905 if (1 == readers || readers == copy->
mutable_tag) {
906 std::atomic_thread_fence(std::memory_order_acquire);
919 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
921 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
922 size_t rc = pending_copies.erase(copy);
926 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
930 static_cast<uint64_t
>(copy->uid),
931 PROFILE_OBJECT_ID_NULL, ©->size,
932 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
940 template <
typename Value>
943 bool replace =
false;
945 assert(readers != 0);
949 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->
defer_writer;
975 }
else if (!readonly) {
989 if (1 == copy_in->
num_readers() && !defer_writer) {
996 std::atomic_thread_fence(std::memory_order_release);
1010 if (NULL == copy_res) {
1012 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1019 for (
int i = 0; i < deferred_op->
data_count; ++i) {
1020 if (deferred_op->
copies[i] == copy_in) {
1021 deferred_op->
copies[i] = new_copy;
1034 copy_res = new_copy;
1038 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() +
" but the type is not copyable");
1047 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1050 int mpi_initialized;
1051 MPI_Initialized(&mpi_initialized);
1052 if (!mpi_initialized) {
1054 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1056 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1069 for (
int i = 0; i < parsec_nb_devices; ++i) {
1070 bool is_gpu = parsec_mca_device_is_gpu(i);
1074 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1079 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1080 if (
nullptr != ttg_max_inline_cstr) {
1081 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1087 bool all_peer_access =
true;
1089 for (
int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1090 parsec_device_module_t *idevice = parsec_mca_device_get(i);
1091 if (PARSEC_DEV_IS_GPU(idevice->type)) {
1092 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1093 for (
int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1095 parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1096 if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1097 all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true :
false;
1112 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1121 template <
typename T>
1123 world.
impl().register_ptr(
ptr);
1126 template <
typename T>
1128 world.
impl().register_ptr(std::move(
ptr));
1132 world.
impl().register_status(status_ptr);
1135 template <
typename Callback>
1137 world.
impl().register_callback(std::forward<Callback>(callback));
1143 double result = 0.0;
1144 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1149 MPI_Barrier(world.
impl().comm());
1154 template <
typename T>
1157 if (world.
rank() == source_rank) {
1160 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1162 unsigned char *buf =
new unsigned char[BUFLEN];
1163 if (world.
rank() == source_rank) {
1166 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1167 if (world.
rank() != source_rank) {
1180 parsec_task_class_t
self;
1185 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs, ttg::ExecutionSpace Space>
1189 static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1190 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1192 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1195 static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1196 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1197 static_assert((ttg::meta::none_has_reference_v<input_valueTs>),
"Input typelist cannot contain reference types");
1198 static_assert(ttg::meta::is_none_Void_v<input_valueTs>,
"ttg::Void is for internal use only, do not use it");
1200 parsec_mempool_t mempools;
1203 template <
typename T>
1204 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1206 template <
typename T>
1207 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1209 template <
typename T>
1210 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1214 static constexpr
int numinedges = std::tuple_size_v<input_tuple_type>;
1215 static constexpr
int numins = std::tuple_size_v<actual_input_tuple_type>;
1216 static constexpr
int numouts = std::tuple_size_v<output_terminalsT>;
1217 static constexpr
int numflows = std::max(numins, numouts);
1247 ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1249 ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1254 std::tuple_size_v<input_refs_tuple_type>;
1260 template <std::
size_t i,
typename resultT,
typename InTuple>
1261 static resultT
get(InTuple &&intuple) {
1262 return static_cast<resultT
>(std::get<i>(std::forward<InTuple>(intuple)));
1264 template <std::
size_t i,
typename InTuple>
1265 static auto &
get(InTuple &&intuple) {
1266 return std::get<i>(std::forward<InTuple>(intuple));
1276 constexpr
static const size_t task_key_offset =
sizeof(task_t);
1279 output_terminalsT output_terminals;
1285 template <std::size_t... IS>
1286 static constexpr
auto make_set_args_fcts(std::index_sequence<IS...>) {
1287 using resultT = decltype(set_arg_from_msg_fcts);
1288 return resultT{{&TT::set_arg_from_msg<IS>...}};
1290 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_arg_from_msg_fcts =
1291 make_set_args_fcts(std::make_index_sequence<numins>{});
1293 template <std::size_t... IS>
1294 static constexpr
auto make_set_size_fcts(std::index_sequence<IS...>) {
1295 using resultT = decltype(set_argstream_size_from_msg_fcts);
1296 return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1298 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1299 make_set_size_fcts(std::make_index_sequence<numins>{});
1301 template <std::size_t... IS>
1302 static constexpr
auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1303 using resultT = decltype(finalize_argstream_from_msg_fcts);
1304 return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1306 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1307 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1309 template <std::size_t... IS>
1310 static constexpr
auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1311 using resultT = decltype(get_from_pull_msg_fcts);
1312 return resultT{{&TT::get_from_pull_msg<IS>...}};
1314 constexpr
static std::array<void (TT::*)(
void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1315 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1317 template<std::size_t... IS>
1318 constexpr
static auto make_input_is_const(std::index_sequence<IS...>) {
1319 using resultT = decltype(input_is_const);
1320 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1322 constexpr
static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1325 ttg::meta::detail::keymap_t<keyT> keymap;
1326 ttg::meta::detail::keymap_t<keyT> priomap;
1327 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1329 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1331 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1332 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1333 int num_pullins = 0;
1337 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1338 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1347 template <
typename... Args>
1348 auto op(Args &&...args) {
1349 derivedT *derived =
static_cast<derivedT *
>(
this);
1350 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1351 if constexpr (std::is_same_v<return_type,void>) {
1352 derived->op(std::forward<Args>(args)...);
1356 return derived->op(std::forward<Args>(args)...);
1360 template <std::
size_t i,
typename terminalT,
typename Key>
1361 void invoke_pull_terminal(terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1362 if (
in.is_pull_terminal) {
1363 auto owner =
in.container.owner(key);
1364 if (owner != world.rank()) {
1365 get_pull_terminal_data_from<i>(owner, key);
1368 set_arg<i>(key, (
in.container).get(key));
1373 template <std::
size_t i,
typename Key>
1374 void get_pull_terminal_data_from(
const int owner,
1376 using msg_t = detail::msg_t;
1377 auto &world_impl = world.impl();
1378 parsec_taskpool_t *tp = world_impl.taskpool();
1379 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), tp->taskpool_id,
1384 pos =
pack(key, msg->bytes, pos);
1385 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1386 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1387 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
1388 sizeof(msg_header_t) + pos);
1391 template <std::size_t... IS,
typename Key = keyT>
1392 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *task) {
1393 int junk[] = {0, (invoke_pull_terminal<IS>(
1394 std::get<IS>(input_terminals), key, task),
1399 template <std::size_t... IS>
1400 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1402 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1403 task->copies[IS]->get_ptr()))...};
1406 #ifdef TTG_HAVE_DEVICE
1410 static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1411 parsec_gpu_task_t *gpu_task,
1412 parsec_gpu_exec_stream_t *gpu_stream) {
1414 task_t *task = (task_t*)gpu_task->ec;
1416 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1418 task->dev_ptr->stream = gpu_stream;
1423 auto dev_data = dev_task.promise();
1426 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1427 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1429 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1431 parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1435 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1437 parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1441 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1443 parsec_level_zero_exec_stream_t *stream;
1444 stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1451 static_op(&task->parsec_task);
1455 auto discard_tmp_flows = [&](){
1456 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1459 const_cast<parsec_flow_t*
>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1460 task->parsec_task.data[i].data_out->readers = 1;
1466 int rc = PARSEC_HOOK_RETURN_DONE;
1467 if (
nullptr != task->suspended_task_address) {
1469 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1470 dev_data = dev_task.promise();
1472 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1473 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1474 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1476 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1477 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1480 discard_tmp_flows();
1483 rc = PARSEC_HOOK_RETURN_AGAIN;
1488 discard_tmp_flows();
1493 static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1495 task_t *task = (task_t*)parsec_task;
1496 if (task->dev_ptr->gpu_task ==
nullptr) {
1499 parsec_gpu_task_t *gpu_task;
1501 gpu_task =
static_cast<parsec_gpu_task_t*
>(std::calloc(1,
sizeof(*gpu_task)));
1502 PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1503 gpu_task->ec = parsec_task;
1504 gpu_task->task_type = 0;
1505 gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max();
1506 gpu_task->pushout = 0;
1507 gpu_task->submit = &TT::device_static_submit;
1516 task->dev_ptr->gpu_task = gpu_task;
1519 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1522 task->dev_ptr->task_class = *task->parsec_task.task_class;
1525 static_op(parsec_task);
1529 parsec_task_class_t& tc = task->dev_ptr->task_class;
1532 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1533 tc.in[i] = gpu_task->flow[i];
1534 tc.out[i] = gpu_task->flow[i];
1536 tc.nb_flows = MAX_PARAM_COUNT;
1540 if (tt->devicemap) {
1542 if constexpr (std::is_void_v<keyT>) {
1547 for (
int i = 0; i < MAX_PARAM_COUNT; ++i) {
1549 if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1550 parsec_data_t *
data = parsec_task->data[i].data_in->original;
1554 if (
data->owner_device == 0) {
1555 parsec_advise_data_on_device(
data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1562 task->parsec_task.task_class = &task->dev_ptr->task_class;
1565 return PARSEC_HOOK_RETURN_DONE;
1568 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1571 return PARSEC_HOOK_RETURN_ERROR;
1575 static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1580 task_t *task = (task_t*)parsec_task;
1582 if (
nullptr == task->suspended_task_address) {
1584 return PARSEC_HOOK_RETURN_DONE;
1588 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1591 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1593 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1594 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1596 return PARSEC_HOOK_RETURN_DONE;
1599 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1600 assert(NULL != device);
1602 task->dev_ptr->device = device;
1603 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1604 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1606 switch(device->super.type) {
1608 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1609 case PARSEC_DEV_CUDA:
1613 gpu_task->stage_in = parsec_default_gpu_stage_in;
1614 gpu_task->stage_out = parsec_default_gpu_stage_out;
1615 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1619 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1620 case PARSEC_DEV_HIP:
1622 gpu_task->stage_in = parsec_default_gpu_stage_in;
1623 gpu_task->stage_out = parsec_default_gpu_stage_out;
1624 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1628 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1629 case PARSEC_DEV_LEVEL_ZERO:
1631 gpu_task->stage_in = parsec_default_gpu_stage_in;
1632 gpu_task->stage_out = parsec_default_gpu_stage_out;
1633 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1640 ttg::print_error(task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1642 return PARSEC_HOOK_RETURN_DONE;
1646 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1648 task_t *task = (task_t*)parsec_task;
1649 void* suspended_task_address =
1650 #ifdef TTG_HAVE_COROUTINE
1651 task->suspended_task_address;
1656 if (suspended_task_address ==
nullptr) {
1658 ttT *baseobj = task->tt;
1659 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1662 if (obj->tracing()) {
1663 if constexpr (!ttg::meta::is_void_v<keyT>)
1664 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": executing");
1666 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : executing");
1669 if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1670 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1671 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1672 }
else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1673 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1674 }
else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1675 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1676 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1677 }
else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1686 #ifdef TTG_HAVE_COROUTINE
1689 #ifdef TTG_HAVE_DEVICE
1691 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1695 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1696 task->tt->set_outputs_tls_ptr();
1698 if (coro.completed()) {
1700 suspended_task_address =
nullptr;
1702 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1708 assert(ret.ready());
1709 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1710 task->tt->set_outputs_tls_ptr();
1712 if (ret.completed()) {
1714 suspended_task_address =
nullptr;
1722 for (
auto &event_ptr : events) {
1723 event_ptr->finish();
1727 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1729 task->suspended_task_address = suspended_task_address;
1737 #ifdef TTG_HAVE_COROUTINE
1738 task->suspended_task_address = suspended_task_address;
1740 if (suspended_task_address ==
nullptr) {
1741 ttT *baseobj = task->tt;
1742 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1743 if (obj->tracing()) {
1744 if constexpr (!ttg::meta::is_void_v<keyT>)
1745 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", task->key,
": done executing");
1747 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1751 return PARSEC_HOOK_RETURN_DONE;
1754 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1755 task_t *task =
static_cast<task_t*
>(parsec_task);
1757 void* suspended_task_address =
1758 #ifdef TTG_HAVE_COROUTINE
1759 task->suspended_task_address;
1763 if (suspended_task_address ==
nullptr) {
1764 ttT *baseobj = (
ttT *)task->object_ptr;
1765 derivedT *obj = (derivedT *)task->object_ptr;
1768 if constexpr (!ttg::meta::is_void_v<keyT>) {
1769 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1770 }
else if constexpr (ttg::meta::is_void_v<keyT>) {
1777 #ifdef TTG_HAVE_COROUTINE
1779 assert(ret.ready());
1781 if (ret.completed()) {
1783 suspended_task_address =
nullptr;
1792 task->suspended_task_address = suspended_task_address;
1794 if (suspended_task_address) {
1797 return PARSEC_HOOK_RETURN_AGAIN;
1800 return PARSEC_HOOK_RETURN_DONE;
1803 template <std::
size_t i>
1804 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1805 using rtask_t = detail::reducer_task_t;
1806 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1807 constexpr
const bool val_is_void = ttg::meta::is_void_v<value_t>;
1808 constexpr
const bool input_is_const = std::is_const_v<value_t>;
1809 rtask_t *rtask = (rtask_t*)parsec_task;
1810 task_t *parent_task =
static_cast<task_t*
>(rtask->parent_task);
1811 ttT *baseobj = parent_task->tt;
1812 derivedT *obj =
static_cast<derivedT *
>(baseobj);
1814 auto& reducer = std::get<i>(baseobj->input_reducers);
1818 if (obj->tracing()) {
1819 if constexpr (!ttg::meta::is_void_v<keyT>)
1820 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1822 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : reducer executing");
1826 detail::ttg_data_copy_t *target_copy;
1827 target_copy = parent_task->copies[i];
1828 assert(val_is_void ||
nullptr != target_copy);
1831 std::size_t
size = 0;
1832 assert(parent_task->streams[i].reduce_count > 0);
1833 if (rtask->is_first) {
1834 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1836 if (obj->tracing()) {
1837 if constexpr (!ttg::meta::is_void_v<keyT>)
1838 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1840 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : first reducer empty");
1843 return PARSEC_HOOK_RETURN_DONE;
1851 if constexpr(!val_is_void) {
1853 detail::ttg_data_copy_t *source_copy;
1854 parsec_list_item_t *item;
1855 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1856 if (
nullptr == item) {
1860 source_copy = ((detail::ttg_data_copy_self_t *)(item))->
self;
1861 assert(target_copy->num_readers() == target_copy->mutable_tag);
1862 assert(source_copy->num_readers() > 0);
1863 reducer(*
reinterpret_cast<std::decay_t<value_t> *
>(target_copy->get_ptr()),
1864 *
reinterpret_cast<std::decay_t<value_t> *
>(source_copy->get_ptr()));
1866 }
else if constexpr(val_is_void) {
1870 size = ++parent_task->streams[i].size;
1872 }
while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1876 bool complete = (
size >= parent_task->streams[i].goal);
1881 if (complete && c == 0) {
1882 if constexpr(input_is_const) {
1884 target_copy->reset_readers();
1887 parent_task->remove_from_hash =
true;
1888 parent_task->release_task(parent_task);
1893 if (obj->tracing()) {
1894 if constexpr (!ttg::meta::is_void_v<keyT>)
1895 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : ", parent_task->key,
": done executing");
1897 ttg::trace(obj->get_world().rank(),
":", obj->get_name(),
" : done executing");
1900 return PARSEC_HOOK_RETURN_DONE;
1905 template <
typename T>
1906 uint64_t
unpack(T &obj,
void *_bytes, uint64_t pos) {
1908 uint64_t payload_size;
1909 if constexpr (!dd_t::serialize_size_is_const) {
1912 payload_size = dd_t::payload_size(&obj);
1914 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1918 template <
typename T>
1921 uint64_t payload_size = dd_t::payload_size(&obj);
1924 copy->iovec_reset();
1927 if constexpr (!dd_t::serialize_size_is_const) {
1930 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1936 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1938 derivedT *obj =
reinterpret_cast<derivedT *
>(bop);
1939 switch (hd->
fn_id) {
1943 assert(hd->
param_id < obj->set_arg_from_msg_fcts.size());
1944 auto member = obj->set_arg_from_msg_fcts[hd->
param_id];
1954 assert(hd->
param_id < obj->set_argstream_size_from_msg_fcts.size());
1955 auto member = obj->set_argstream_size_from_msg_fcts[hd->
param_id];
1961 assert(hd->
param_id < obj->finalize_argstream_from_msg_fcts.size());
1962 auto member = obj->finalize_argstream_from_msg_fcts[hd->
param_id];
1968 assert(hd->
param_id < obj->get_from_pull_msg_fcts.size());
1969 auto member = obj->get_from_pull_msg_fcts[hd->
param_id];
1980 auto &world_impl = world.impl();
1981 parsec_execution_stream_s *es = world_impl.execution_stream();
1982 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1983 return &mempools.thread_mempools[index];
1986 template <
size_t i,
typename valueT>
1990 parsec_execution_stream_s *es = world.impl().execution_stream();
1992 dummy =
new (parsec_thread_mempool_allocate(mempool))
task_t(mempool, &this->
self,
this);
2000 dummy->
parsec_task.taskpool = world.impl().taskpool();
2007 parsec_task_t *task_ring =
nullptr;
2008 for (
auto &&key : keylist) {
2010 if constexpr (std::is_copy_constructible_v<valueT>) {
2011 set_arg_local_impl<i>(key, *
reinterpret_cast<valueT *
>(copy->
get_ptr()), copy, &task_ring);
2015 static_assert(!std::is_reference_v<valueT>);
2017 set_arg_local_impl<i>(key, std::move(*
reinterpret_cast<valueT *
>(copy->
get_ptr())), copy, &task_ring);
2019 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() +
" but the type is not copyable");
2024 if (
nullptr != task_ring) {
2025 auto &world_impl = world.impl();
2026 parsec_task_t *vp_task_ring[1] = { task_ring };
2027 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2034 complete_task_and_release(es, &dummy->
parsec_task);
2035 parsec_thread_mempool_free(mempool, &dummy->
parsec_task);
2047 template <std::
size_t i>
2049 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2051 msg_t *msg =
static_cast<msg_t *
>(
data);
2052 if constexpr (!ttg::meta::is_void_v<keyT>) {
2056 uint64_t key_end_pos;
2057 std::vector<keyT> keylist;
2058 int num_keys = msg->tt_id.num_keys;
2059 keylist.reserve(num_keys);
2060 auto rank = world.rank();
2061 for (
int k = 0; k < num_keys; ++k) {
2063 pos =
unpack(key, msg->bytes, pos);
2064 assert(keymap(key) ==
rank);
2065 keylist.push_back(std::move(key));
2071 if constexpr (!ttg::meta::is_void_v<valueT>) {
2072 using decvalueT = std::decay_t<valueT>;
2073 int32_t num_iovecs = msg->tt_id.num_iovecs;
2078 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2081 metadata_t metadata;
2082 pos =
unpack(metadata, msg->bytes, pos);
2091 parsec_gpu_data_copy_t* gpu_elem;
2092 gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2095 while (i < parsec_nb_devices) {
2096 if (
nullptr == gpu_elem) {
2097 gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2098 gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2099 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2100 gpu_elem->version = 0;
2101 gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2103 if (
nullptr == gpu_elem->device_private) {
2104 gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2105 if (
nullptr == gpu_elem->device_private) {
2114 pos =
unpack(*
static_cast<decvalueT *
>(copy->
get_ptr()), msg->bytes, pos);
2119 if (num_iovecs == 0) {
2120 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2125 int remote = msg->tt_id.sender;
2126 assert(remote < world.size());
2128 auto &val = *
static_cast<decvalueT *
>(copy->
get_ptr());
2130 bool inline_data = msg->tt_id.inline_data;
2134 auto handle_iovecs_fn =
2135 [&](
auto&& iovecs) {
2139 for (
auto &&iov : iovecs) {
2141 std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2142 pos += iov.num_bytes;
2146 parsec_ce_tag_t cbtag;
2147 std::memcpy(&cbtag, msg->bytes + pos,
sizeof(cbtag));
2148 pos +=
sizeof(cbtag);
2153 set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2154 this->world.impl().decrement_inflight_msg();
2157 using ActivationT = std::decay_t<decltype(*activation)>;
2159 for (
auto &&iov : iovecs) {
2161 parsec_ce_mem_reg_handle_t rreg;
2162 int32_t rreg_size_i;
2163 std::memcpy(&rreg_size_i, msg->bytes + pos,
sizeof(rreg_size_i));
2164 pos +=
sizeof(rreg_size_i);
2165 rreg =
static_cast<parsec_ce_mem_reg_handle_t
>(msg->bytes + pos);
2169 std::intptr_t fn_ptr;
2170 std::memcpy(&fn_ptr, msg->bytes + pos,
sizeof(fn_ptr));
2171 pos +=
sizeof(fn_ptr);
2174 parsec_ce_mem_reg_handle_t lreg;
2176 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2177 iov.num_bytes, &lreg, &lreg_size);
2178 world.impl().increment_inflight_msg();
2181 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2182 &detail::get_complete_cb<ActivationT>, activation,
2184 cbtag, &fn_ptr,
sizeof(std::intptr_t));
2190 handle_iovecs_fn(descr.get_data(val));
2196 assert(num_iovecs == nv);
2200 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2204 }
else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2205 for (
auto &&key : keylist) {
2206 set_arg<i, keyT, ttg::Void>(key,
ttg::Void{});
2210 }
else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2211 using decvalueT = std::decay_t<valueT>;
2214 unpack(val, msg->bytes, 0);
2215 set_arg<i, keyT, valueT>(std::move(val));
2217 }
else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2218 set_arg<i, keyT, ttg::Void>(
ttg::Void{});
2224 template <std::
size_t i>
2227 msg_t *msg =
static_cast<msg_t *
>(
data);
2228 if constexpr (!ttg::meta::is_void_v<keyT>) {
2231 auto rank = world.rank();
2233 pos =
unpack(key, msg->bytes, pos);
2234 assert(keymap(key) ==
rank);
2235 finalize_argstream<i>(key);
2237 auto rank = world.rank();
2238 assert(keymap() ==
rank);
2239 finalize_argstream<i>();
2243 template <std::
size_t i>
2246 auto msg =
static_cast<msg_t *
>(
data);
2248 if constexpr (!ttg::meta::is_void_v<keyT>) {
2250 auto rank = world.rank();
2252 pos =
unpack(key, msg->bytes, pos);
2253 assert(keymap(key) ==
rank);
2254 std::size_t argstream_size;
2255 pos =
unpack(argstream_size, msg->bytes, pos);
2256 set_argstream_size<i>(key, argstream_size);
2258 auto rank = world.rank();
2259 assert(keymap() ==
rank);
2260 std::size_t argstream_size;
2261 pos =
unpack(argstream_size, msg->bytes, pos);
2262 set_argstream_size<i>(argstream_size);
2266 template <std::
size_t i>
2269 msg_t *msg =
static_cast<msg_t *
>(
data);
2270 auto &
in = std::get<i>(input_terminals);
2271 if constexpr (!ttg::meta::is_void_v<keyT>) {
2275 pos =
unpack(key, msg->bytes, pos);
2276 set_arg<i>(key, (
in.container).get(key));
2280 template <std::
size_t i,
typename Key,
typename Value>
2281 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2282 const Key &key, Value &&value) {
2283 set_arg_local_impl<i>(key, std::forward<Value>(value));
2286 template <std::
size_t i,
typename Key = keyT,
typename Value>
2287 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2289 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2292 template <std::
size_t i,
typename Key,
typename Value>
2293 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2294 const Key &key,
const Value &value) {
2295 set_arg_local_impl<i>(key, value);
2298 template <std::
size_t i,
typename Key = keyT,
typename Value>
2299 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2300 const Value &value) {
2301 set_arg_local_impl<i>(
ttg::Void{}, value);
2304 template <std::
size_t i,
typename Key = keyT,
typename Value>
2305 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2306 std::shared_ptr<const Value> &valueptr) {
2307 set_arg_local_impl<i>(
ttg::Void{}, *valueptr);
2310 template <
typename Key>
2312 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2313 auto &world_impl = world.impl();
2316 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2317 int32_t priority = 0;
2318 if constexpr (!keyT_is_Void) {
2319 priority = priomap(key);
2321 newtask =
new (taskobj)
task_t(key, mempool, &this->
self, world_impl.taskpool(),
this, priority);
2323 priority = priomap();
2325 newtask =
new (taskobj)
task_t(mempool, &this->
self, world_impl.taskpool(),
this, priority);
2328 for (
int i = 0; i < static_stream_goal.size(); ++i) {
2329 newtask->
streams[i].goal = static_stream_goal[i];
2337 template <std::
size_t i>
2341 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2342 auto &world_impl = world.impl();
2345 char *taskobj = (
char *)parsec_thread_mempool_allocate(mempool);
2347 int32_t priority = 0;
2348 if constexpr (!keyT_is_Void) {
2349 priority = priomap(task->
key);
2352 priority = priomap();
2357 world_impl.taskpool(), priority, is_first);
2364 template <std::
size_t i,
typename Key,
typename Value>
2366 parsec_task_t **task_ring =
nullptr) {
2367 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2368 constexpr
const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2369 constexpr
const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2370 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2373 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ", i);
2375 parsec_key_t hk = 0;
2376 if constexpr (!keyT_is_Void) {
2377 hk =
reinterpret_cast<parsec_key_t
>(&key);
2378 assert(keymap(key) == world.rank());
2382 auto &world_impl = world.impl();
2383 auto &reducer = std::get<i>(input_reducers);
2385 bool remove_from_hash =
true;
2386 #if defined(PARSEC_PROF_GRAPHER)
2387 bool discover_task =
true;
2389 bool get_pull_data =
false;
2390 bool has_lock =
false;
2392 if (numins > 1 || reducer) {
2395 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
2397 world_impl.increment_created();
2400 if( world_impl.dag_profiling() ) {
2401 #if defined(PARSEC_PROF_GRAPHER)
2402 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2406 }
else if (!reducer && numins == (task->
in_data_count + 1)) {
2408 parsec_hash_table_nolock_remove(&
tasks_table, hk);
2409 remove_from_hash =
false;
2413 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2418 world_impl.increment_created();
2419 remove_from_hash =
false;
2420 if( world_impl.dag_profiling() ) {
2421 #if defined(PARSEC_PROF_GRAPHER)
2422 parsec_prof_grapher_task(&task->
parsec_task, world_impl.execution_stream()->th_id, 0,
2428 if( world_impl.dag_profiling() ) {
2429 #if defined(PARSEC_PROF_GRAPHER)
2434 if(orig_index >= 0) {
2435 snprintf(orig_str, 32,
"%d", orig_index);
2437 strncpy(orig_str,
"_", 32);
2439 snprintf(dest_str, 32,
"%lu", i);
2440 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2441 .flow_index = 0, .flow_datatype_mask = ~0 };
2442 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2443 .flow_index = 0, .flow_datatype_mask = ~0 };
2454 if (
nullptr != copy) {
2456 copy = detail::register_data_copy<valueT>(copy, task,
is_const);
2467 if (reducer && 1 != task->
streams[i].goal) {
2468 auto submit_reducer_task = [&](
auto *parent_task){
2470 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2475 reduce_task = create_new_reducer_task<i>(parent_task,
false);
2480 if constexpr (!ttg::meta::is_void_v<valueT>) {
2483 if (
nullptr == (copy = task->
copies[i])) {
2484 using decay_valueT = std::decay_t<valueT>;
2489 reduce_task = create_new_reducer_task<i>(task,
true);
2493 task->
streams[i].reduce_count.store(1, std::memory_order_relaxed);
2509 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2512 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2518 parsec_lifo_push(&task->
streams[i].reduce_copies, ©->
super);
2519 submit_reducer_task(task);
2523 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2525 submit_reducer_task(task);
2535 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
2538 if constexpr (!valueT_is_Void) {
2539 if (
nullptr != task->
copies[i]) {
2541 throw std::logic_error(
"bad set arg");
2561 if constexpr (!ttg::meta::is_void_v<keyT>) {
2562 if (get_pull_data) {
2569 bool constrained =
false;
2570 if (constraints_check.size() > 0) {
2571 if constexpr (ttg::meta::is_void_v<keyT>) {
2572 constrained = !constraints_check[0]();
2574 constrained = !constraints_check[0](task->
key);
2581 return !constrained;
2584 template<
typename Key = keyT>
2587 assert(cid < constraints_check.size());
2589 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2590 if (!constraints_check[i]()) {
2598 parsec_key_t hk = 0;
2600 assert(task !=
nullptr);
2601 auto &world_impl = world.impl();
2602 parsec_execution_stream_t *es = world_impl.execution_stream();
2603 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2604 __parsec_schedule_vp(es, vp_task_rings, 0);
2608 template<
typename Key = keyT>
2609 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
release_constraint(std::size_t cid,
const std::span<Key>& keys) {
2610 assert(cid < constraints_check.size());
2611 parsec_task_t *task_ring =
nullptr;
2612 for (
auto& key : keys) {
2615 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2616 if (!constraints_check[i](key)) {
2624 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
2626 assert(task !=
nullptr);
2627 if (task_ring ==
nullptr) {
2632 parsec_list_item_ring_push_sorted(&task_ring->super, &task->
parsec_task.super,
2633 offsetof(parsec_task_t, priority));
2637 if (
nullptr != task_ring) {
2638 auto &world_impl = world.impl();
2639 parsec_execution_stream_t *es = world_impl.execution_stream();
2640 parsec_task_t *vp_task_rings[1] = { task_ring };
2641 __parsec_schedule_vp(es, vp_task_rings, 0);
2646 parsec_task_t **task_ring =
nullptr) {
2647 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2656 count = parsec_atomic_fetch_inc_int32(&task->
in_data_count) + 1;
2657 assert(count <=
self.dependencies_goal);
2660 auto &world_impl = world.impl();
2661 ttT *baseobj = task->
tt;
2663 if (count == numins) {
2664 parsec_execution_stream_t *es = world_impl.execution_stream();
2665 parsec_key_t hk = task->
pkey();
2667 if constexpr (!keyT_is_Void) {
2676 if (
nullptr == task_ring) {
2677 parsec_task_t *vp_task_rings[1] = { &task->
parsec_task };
2678 __parsec_schedule_vp(es, vp_task_rings, 0);
2679 }
else if (*task_ring ==
nullptr) {
2684 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->
parsec_task.super,
2685 offsetof(parsec_task_t, priority));
2688 }
else if constexpr (!ttg::meta::is_void_v<keyT>) {
2689 if ((baseobj->num_pullins + count == numins) && baseobj->
is_lazy_pull()) {
2697 template <std::
size_t i,
typename Key,
typename Value>
2698 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2700 set_arg_impl<i>(key, std::forward<Value>(value));
2704 template <std::
size_t i,
typename Key,
typename Value>
2705 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(Value &&value) {
2706 set_arg_impl<i>(
ttg::Void{}, std::forward<Value>(value));
2709 template <std::
size_t i,
typename Key = keyT>
2710 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2715 template <std::
size_t i,
typename Key>
2716 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2720 template<
typename Value,
typename Key>
2722 using decvalueT = std::decay_t<Value>;
2723 bool inline_data =
false;
2725 std::size_t iov_size = 0;
2726 std::size_t metadata_size = 0;
2729 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2730 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2731 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2732 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2738 [](std::size_t s,
auto& iov){ return s + iov.num_bytes; });
2742 std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2750 template <std::
size_t i,
typename Key,
typename Value>
2753 using decvalueT = std::decay_t<Value>;
2754 using norefvalueT = std::remove_reference_t<Value>;
2755 norefvalueT *value_ptr = &value;
2757 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2758 if(world.impl().profiling()) {
2759 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2763 if constexpr (!ttg::meta::is_void_v<Key>)
2764 owner = keymap(key);
2767 if (owner == world.rank()) {
2768 if constexpr (!ttg::meta::is_void_v<keyT>)
2769 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2771 set_arg_local_impl<i>(
ttg::Void{}, std::forward<Value>(value), copy_in);
2772 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2773 if(world.impl().profiling()) {
2774 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2783 auto &world_impl = world.impl();
2786 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2789 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2793 if (
nullptr == copy) {
2795 if (
nullptr == copy) {
2799 value_ptr =
static_cast<norefvalueT*
>(copy->
get_ptr());
2804 msg->tt_id.inline_data = inline_data;
2806 auto handle_iovec_fn = [&](
auto&& iovecs){
2810 for (
auto &&iov : iovecs) {
2811 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2812 pos += iov.num_bytes;
2819 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
2820 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
2821 pos +=
sizeof(cbtag);
2827 for (
auto &&iov : iovecs) {
2828 copy = detail::register_data_copy<decvalueT>(copy,
nullptr,
true);
2829 parsec_ce_mem_reg_handle_t lreg;
2832 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2833 iov.num_bytes, &lreg, &lreg_size);
2834 auto lreg_ptr = std::shared_ptr<void>{lreg, [](
void *
ptr) {
2835 parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)
ptr;
2836 parsec_ce.mem_unregister(&memreg);
2838 int32_t lreg_size_i = lreg_size;
2839 std::memcpy(msg->bytes + pos, &lreg_size_i,
sizeof(lreg_size_i));
2840 pos +=
sizeof(lreg_size_i);
2841 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2845 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2851 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2852 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
2853 pos +=
sizeof(fn_ptr);
2860 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(value_ptr));
2861 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2863 auto metadata = descr.get_metadata(*
const_cast<decvalueT *
>(value_ptr));
2864 pos =
pack(metadata, msg->bytes, pos);
2866 handle_iovec_fn(iovs);
2870 pos =
pack(*value_ptr, msg->bytes, pos, copy);
2878 msg->tt_id.num_iovecs = num_iovecs;
2882 msg->tt_id.num_keys = 0;
2883 msg->tt_id.key_offset = pos;
2884 if constexpr (!ttg::meta::is_void_v<Key>) {
2885 size_t tmppos =
pack(key, msg->bytes, pos);
2887 msg->tt_id.num_keys = 1;
2890 parsec_taskpool_t *tp = world_impl.taskpool();
2891 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2892 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2894 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
2896 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2897 if(world.impl().profiling()) {
2898 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2901 #if defined(PARSEC_PROF_GRAPHER)
2906 if(orig_index >= 0) {
2907 snprintf(orig_str, 32,
"%d", orig_index);
2909 strncpy(orig_str,
"_", 32);
2911 snprintf(dest_str, 32,
"%lu", i);
2912 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2913 .flow_index = 0, .flow_datatype_mask = ~0 };
2914 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2915 .flow_index = 0, .flow_datatype_mask = ~0 };
2923 template <
int i,
typename Iterator,
typename Value>
2925 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2926 if(world.impl().profiling()) {
2927 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2930 parsec_task_t *task_ring =
nullptr;
2936 for (
auto it = begin; it != end; ++it) {
2937 set_arg_local_impl<i>(*it, value, copy, &task_ring);
2940 if (
nullptr != task_ring) {
2941 parsec_task_t *vp_task_ring[1] = { task_ring };
2942 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2944 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2945 if(world.impl().profiling()) {
2946 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2951 template <std::
size_t i,
typename Key,
typename Value>
2952 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2955 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2957 auto np = world.size();
2958 int rank = world.rank();
2960 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2961 [&](
const Key &key) { return keymap(key) != rank; });
2964 using decvalueT = std::decay_t<Value>;
2967 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2968 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](
const Key &a,
const Key &b)
mutable {
2969 int rank_a = keymap(a);
2970 int rank_b = keymap(b);
2972 int pos_a = (rank_a + np - rank) % np;
2973 int pos_b = (rank_b + np - rank) % np;
2974 return pos_a < pos_b;
2978 auto local_begin = keylist_sorted.end();
2979 auto local_end = keylist_sorted.end();
2981 int32_t num_iovs = 0;
2985 assert(
nullptr != copy);
2988 auto &world_impl = world.impl();
2989 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
2994 bool inline_data =
can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
2995 msg->tt_id.inline_data = inline_data;
2997 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
2998 auto handle_iovs_fn = [&](
auto&& iovs){
3002 for (
auto &&iov : iovs) {
3003 std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
3004 pos += iov.num_bytes;
3011 parsec_ce_tag_t cbtag =
reinterpret_cast<parsec_ce_tag_t
>(&detail::get_remote_complete_cb);
3012 std::memcpy(msg->bytes + pos, &cbtag,
sizeof(cbtag));
3013 pos +=
sizeof(cbtag);
3015 for (
auto &&iov : iovs) {
3016 parsec_ce_mem_reg_handle_t lreg;
3018 parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
3019 iov.num_bytes, &lreg, &lreg_size);
3021 memregs.push_back(std::make_pair(
static_cast<int32_t
>(lreg_size),
3023 std::shared_ptr<void>{lreg, [](
void *
ptr) {
3024 parsec_ce_mem_reg_handle_t memreg =
3025 (parsec_ce_mem_reg_handle_t)
ptr;
3027 parsec_ce.mem_unregister(&memreg);
3037 auto metadata = descr.get_metadata(value);
3038 pos =
pack(metadata, msg->bytes, pos);
3039 auto iovs = descr.get_data(*
const_cast<decvalueT *
>(&value));
3040 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3041 memregs.reserve(num_iovs);
3042 handle_iovs_fn(iovs);
3046 pos =
pack(value, msg->bytes, pos, copy);
3052 msg->tt_id.num_iovecs = num_iovs;
3054 std::size_t save_pos = pos;
3056 parsec_taskpool_t *tp = world_impl.taskpool();
3057 for (
auto it = keylist_sorted.begin(); it < keylist_sorted.end(); ) {
3059 auto owner = keymap(*it);
3060 if (owner ==
rank) {
3064 std::find_if_not(++it, keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3077 for (
int idx = 0; idx < num_iovs; ++idx) {
3080 std::shared_ptr<void> lreg_ptr;
3081 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3082 std::memcpy(msg->bytes + pos, &lreg_size,
sizeof(lreg_size));
3083 pos +=
sizeof(lreg_size);
3084 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3088 copy = detail::register_data_copy<valueT>(copy,
nullptr,
true);
3090 std::function<void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3096 std::intptr_t fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3097 std::memcpy(msg->bytes + pos, &fn_ptr,
sizeof(fn_ptr));
3098 pos +=
sizeof(fn_ptr);
3103 msg->tt_id.key_offset = pos;
3109 pos =
pack(*it, msg->bytes, pos);
3111 }
while (it < keylist_sorted.end() && keymap(*it) == owner);
3112 msg->tt_id.num_keys = num_keys;
3114 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3115 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3117 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3121 broadcast_arg_local<i>(local_begin, local_end, value);
3124 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3131 template <
typename Key,
typename... Ts,
size_t... Is,
size_t... Js>
3132 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3133 std::index_sequence<Js...>,
const Key &key,
3134 const std::tuple<Ts...> &args) {
3135 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3136 constexpr
size_t js[] = {Js...};
3137 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3143 template <
typename Key,
typename... Ts,
size_t... Is>
3144 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
const Key &key,
3145 const std::tuple<Ts...> &args) {
3146 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3152 template <
typename Key = keyT,
typename... Ts,
size_t... Is,
size_t... Js>
3153 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3154 const std::tuple<Ts...> &args) {
3155 static_assert(
sizeof...(Js) ==
sizeof...(Is));
3156 constexpr
size_t js[] = {Js...};
3157 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3163 template <
typename Key = keyT,
typename... Ts,
size_t... Is>
3164 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...> is,
3165 const std::tuple<Ts...> &args) {
3166 set_args(std::index_sequence_for<Ts...>{}, is, args);
3172 template <std::
size_t i>
3174 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3175 assert(
size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3177 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ", i);
3180 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3182 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3185 static_stream_goal[i] =
size;
3191 template <std::
size_t i,
typename Key>
3194 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3195 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3198 const auto owner = keymap(key);
3199 if (owner != world.rank()) {
3200 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ", i);
3202 auto &world_impl = world.impl();
3204 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3206 world_impl.rank(), 1);
3208 pos =
pack(key, msg->bytes, pos);
3210 parsec_taskpool_t *tp = world_impl.taskpool();
3211 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3212 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3213 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3216 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ",
size,
" for terminal ", i);
3218 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3221 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3223 world.impl().increment_created();
3225 if( world.impl().dag_profiling() ) {
3226 #if defined(PARSEC_PROF_GRAPHER)
3227 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3231 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3241 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3243 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3252 template <std::
size_t i,
typename Key = keyT>
3255 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3256 assert(
size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3259 const auto owner = keymap();
3260 if (owner != world.rank()) {
3261 ttg::trace(world.rank(),
":",
get_name(),
" : forwarding stream size for terminal ", i);
3263 auto &world_impl = world.impl();
3265 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3267 world_impl.rank(), 0);
3269 parsec_taskpool_t *tp = world_impl.taskpool();
3270 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3271 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3272 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3277 parsec_key_t hk = 0;
3280 if (
nullptr == (task = (
task_t *)parsec_hash_table_nolock_find(&
tasks_table, hk))) {
3282 world.impl().increment_created();
3284 if( world.impl().dag_profiling() ) {
3285 #if defined(PARSEC_PROF_GRAPHER)
3286 parsec_prof_grapher_task(&task->
parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->
parsec_task.locals[0]));
3290 parsec_hash_table_unlock_bucket(&
tasks_table, hk);
3300 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3302 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3310 template <std::
size_t i,
typename Key>
3313 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3316 const auto owner = keymap(key);
3317 if (owner != world.rank()) {
3318 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ", i);
3320 auto &world_impl = world.impl();
3322 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3324 world_impl.rank(), 1);
3326 pos =
pack(key, msg->bytes, pos);
3327 parsec_taskpool_t *tp = world_impl.taskpool();
3328 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3329 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3330 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3333 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ", i);
3335 auto hk =
reinterpret_cast<parsec_key_t
>(&key);
3340 " : error finalize called on stream that never received an input data: ", i);
3341 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3352 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3354 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3355 if (1 == c && (task->
streams[i].size >= 1)) {
3362 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3365 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3368 const auto owner = keymap();
3369 if (owner != world.rank()) {
3370 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ", i);
3372 auto &world_impl = world.impl();
3374 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(
get_instance_id(), world_impl.taskpool()->taskpool_id,
3376 world_impl.rank(), 0);
3377 parsec_taskpool_t *tp = world_impl.taskpool();
3378 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3379 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3380 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner,
static_cast<void *
>(msg.get()),
3385 auto hk =
static_cast<parsec_key_t
>(0);
3389 " : error finalize called on stream that never received an input data: ", i);
3390 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3401 task->
streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3403 auto c = task->
streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3404 if (1 == c && (task->
streams[i].size >= 1)) {
3414 auto check_parsec_data = [&](parsec_data_t*
data) {
3415 if (
data->owner_device != 0) {
3418 while (flowidx < MAX_PARAM_COUNT &&
3419 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3426 if (flowidx == MAX_PARAM_COUNT) {
3427 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3429 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3432 gpu_task->flow_nb_elts[flowidx] =
data->nb_elts;
3435 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_WRITE;
3436 gpu_task->pushout |= 1<<flowidx;
3444 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3445 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3448 constexpr
const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3455 if (
nullptr == copy) {
3460 bool need_pushout =
false;
3468 auto &reducer = std::get<i>(input_reducers);
3476 if constexpr (value_is_const) {
3495 need_pushout =
true;
3502 need_pushout =
true;
3506 need_pushout =
true;
3513 need_pushout =
true;
3517 if (!need_pushout) {
3518 bool device_supported =
false;
3528 if (!device_supported) {
3529 need_pushout = remote_check();
3540 template <std::
size_t i,
typename Key,
typename Value>
3541 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3544 auto remote_check = [&](){
3546 int rank = world.rank();
3547 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3548 [&](
const Key &key) { return keymap(key) != rank; });
3551 do_prepare_send<i>(value, remote_check);
3554 template <std::
size_t i,
typename Key,
typename Value>
3555 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3558 auto remote_check = [&](){
3560 int rank = world.rank();
3561 return (keymap() !=
rank);
3563 do_prepare_send<i>(value, remote_check);
3576 TT(
const TT &other) =
delete;
3577 TT &operator=(
const TT &other) =
delete;
3578 TT(
TT &&other) =
delete;
3579 TT &operator=(
TT &&other) =
delete;
3582 template <
typename terminalT, std::
size_t i>
3583 void register_input_callback(terminalT &input) {
3584 using valueT = std::decay_t<typename terminalT::value_type>;
3585 if (input.is_pull_terminal) {
3591 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3592 auto move_callback = [
this](
const keyT &key, valueT &&value) {
3593 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3595 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3596 set_arg<i, keyT, const valueT &>(key, value);
3598 auto broadcast_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3599 broadcast_arg<i, keyT, valueT>(keylist, value);
3601 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &keylist,
const valueT &value) {
3602 prepare_send<i, keyT, valueT>(keylist, value);
3604 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3605 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3606 input.set_callback(send_callback, move_callback, broadcast_callback,
3607 setsize_callback, finalize_callback, prepare_send_callback);
3612 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3613 auto send_callback = [
this](
const keyT &key) { set_arg<i, keyT, ttg::Void>(key,
ttg::Void{}); };
3614 auto setsize_callback = [
this](
const keyT &key, std::size_t
size) { set_argstream_size<i>(key,
size); };
3615 auto finalize_callback = [
this](
const keyT &key) { finalize_argstream<i>(key); };
3616 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3625 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3626 auto move_callback = [
this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3627 auto send_callback = [
this](
const valueT &value) {
3628 if constexpr (std::is_copy_constructible_v<valueT>) {
3629 set_arg<i, keyT, const valueT &>(value);
3632 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3635 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3636 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3637 auto prepare_send_callback = [
this](
const valueT &value) {
3638 prepare_send<i, void>(value);
3640 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3645 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3646 auto send_callback = [
this]() { set_arg<i, keyT, ttg::Void>(
ttg::Void{}); };
3647 auto setsize_callback = [
this](std::size_t
size) { set_argstream_size<i>(
size); };
3648 auto finalize_callback = [
this]() { finalize_argstream<i>(); };
3649 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3659 template <std::size_t... IS>
3660 void register_input_callbacks(std::index_sequence<IS...>) {
3663 (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3668 template <std::size_t... IS,
typename inedgesT>
3669 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3670 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3674 template <std::size_t... IS,
typename outedgesT>
3675 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3676 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3681 template <
typename input_terminals_tupleT, std::size_t... IS,
typename flowsT>
3682 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3684 (*(
const_cast<std::remove_const_t<decltype(flows[IS]-
>flow_flags)> *>(&(flows[IS]->flow_flags))) =
3685 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3686 : PARSEC_FLOW_ACCESS_RW),
3691 template <
typename input_terminals_tupleT,
typename flowsT>
3692 void initialize_flows(flowsT &&flows) {
3693 _initialize_flows<input_terminals_tupleT>(
3700 static int key_equal(parsec_key_t a, parsec_key_t b,
void *user_data) {
3701 if constexpr (std::is_same_v<keyT, void>) {
3704 keyT &ka = *(
reinterpret_cast<keyT *
>(a));
3705 keyT &kb = *(
reinterpret_cast<keyT *
>(b));
3710 static uint64_t key_hash(parsec_key_t k,
void *user_data) {
3711 constexpr
const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3712 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3715 keyT &kk = *(
reinterpret_cast<keyT *
>(k));
3717 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3722 static char *key_print(
char *buffer,
size_t buffer_size, parsec_key_t k,
void *user_data) {
3723 if constexpr (std::is_same_v<keyT, void>) {
3727 keyT kk = *(
reinterpret_cast<keyT *
>(k));
3728 std::stringstream iss;
3730 memset(buffer, 0, buffer_size);
3731 iss.get(buffer, buffer_size);
3736 static parsec_key_t make_key(
const parsec_taskpool_t *tp,
const parsec_assignment_t *as) {
3738 keyT *key = *(keyT**)&(as[2]);
3739 return reinterpret_cast<parsec_key_t
>(key);
3742 static char *parsec_ttg_task_snprintf(
char *buffer,
size_t buffer_size,
const parsec_task_t *parsec_task) {
3743 if(buffer_size == 0)
3746 if constexpr (ttg::meta::is_void_v<keyT>) {
3747 snprintf(buffer, buffer_size,
"%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3749 const task_t *task =
reinterpret_cast<const task_t*
>(parsec_task);
3750 std::stringstream ss;
3753 std::string keystr = ss.str();
3754 std::replace(keystr.begin(), keystr.end(),
'(',
':');
3755 std::replace(keystr.begin(), keystr.end(),
')',
':');
3757 snprintf(buffer, buffer_size,
"%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3762 #if defined(PARSEC_PROF_TRACE)
3763 static void *parsec_ttg_task_info(
void *dst,
const void *
data,
size_t size)
3765 const task_t *task =
reinterpret_cast<const task_t *
>(
data);
3767 if constexpr (ttg::meta::is_void_v<keyT>) {
3768 snprintf(
reinterpret_cast<char*
>(dst),
size,
"()");
3770 std::stringstream ss;
3772 snprintf(
reinterpret_cast<char*
>(dst),
size,
"%s", ss.str().c_str());
3778 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3780 template<std::
size_t I>
3781 inline static void increment_data_version_impl(task_t *task) {
3782 if constexpr (!
std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3783 if (task->copies[I] !=
nullptr){
3784 task->copies[I]->inc_current_version();
3789 template<std::size_t... Is>
3790 inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3792 int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3796 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3800 task_t *task = (task_t*)parsec_task;
3802 #ifdef TTG_HAVE_COROUTINE
3804 if (task->suspended_task_address) {
3806 #ifdef TTG_HAVE_DEVICE
3813 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3816 auto dev_data = dev_task.promise();
3818 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3819 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3822 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3825 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3826 task->tt->set_outputs_tls_ptr();
3827 dev_data.do_sends();
3828 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3835 task->suspended_task_address =
nullptr;
3840 for (
int i = 0; i < task->data_count; i++) {
3841 detail::ttg_data_copy_t *copy = task->
copies[i];
3842 if (
nullptr == copy)
continue;
3844 task->copies[i] =
nullptr;
3847 for (
auto& c : task->tt->constraints_complete) {
3848 if constexpr(std::is_void_v<keyT>) {
3854 return PARSEC_HOOK_RETURN_DONE;
3858 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3859 typename priomapT = ttg::detail::default_priomap<keyT>>
3860 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
3861 ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3862 :
ttg::
TTBase(name, numinedges, numouts)
3865 , keymap(std::is_same<keymapT,
ttg::detail::default_keymap<keyT>>::value
3866 ? decltype(keymap)(
ttg::detail::default_keymap<keyT>(world))
3867 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3868 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3870 if (innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3871 if (outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3873 auto &world_impl = world.
impl();
3874 world_impl.register_op(
this);
3876 if constexpr (numinedges == numins) {
3884 register_input_callbacks(std::make_index_sequence<numinedges>{});
3887 memset(&
self, 0,
sizeof(parsec_task_class_t));
3889 self.name = strdup(
get_name().c_str());
3891 self.nb_parameters = 0;
3894 self.nb_flows = MAX_PARAM_COUNT;
3897 if( world_impl.profiling() ) {
3899 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3901 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3912 self.make_key = make_key;
3913 self.key_functions = &tasks_hash_fcts;
3914 self.task_snprintf = parsec_ttg_task_snprintf;
3916 #if defined(PARSEC_PROF_TRACE)
3917 self.profile_info = &parsec_ttg_task_info;
3920 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
3925 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3926 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
3927 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3928 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3929 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3930 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3931 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3933 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3934 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_HIP;
3935 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3936 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_hip<TT>;
3938 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3939 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3940 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3941 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3943 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
3944 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3945 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3946 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3948 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3949 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3950 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3953 self.incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
3954 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CPU;
3955 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
3956 ((__parsec_chore_t *)
self.incarnations)[0].hook = &detail::hook<TT>;
3957 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_NONE;
3958 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
3959 ((__parsec_chore_t *)
self.incarnations)[1].hook = NULL;
3963 self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3964 self.complete_execution = complete_task_and_release;
3966 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3967 parsec_flow_t *flow =
new parsec_flow_t;
3968 flow->name = strdup((std::string(
"flow in") + std::to_string(i)).c_str());
3969 flow->sym_type = PARSEC_SYM_INOUT;
3972 flow->dep_in[0] = NULL;
3973 flow->dep_out[0] = NULL;
3974 flow->flow_index = i;
3975 flow->flow_datatype_mask = ~0;
3976 *((parsec_flow_t **)&(
self.
in[i])) = flow;
3981 for (i = 0; i < MAX_PARAM_COUNT; i++) {
3982 parsec_flow_t *flow =
new parsec_flow_t;
3983 flow->name = strdup((std::string(
"flow out") + std::to_string(i)).c_str());
3984 flow->sym_type = PARSEC_SYM_INOUT;
3985 flow->flow_flags = PARSEC_FLOW_ACCESS_READ;
3986 flow->dep_in[0] = NULL;
3987 flow->dep_out[0] = NULL;
3988 flow->flow_index = i;
3989 flow->flow_datatype_mask = (1 << i);
3990 *((parsec_flow_t **)&(
self.
out[i])) = flow;
3995 self.dependencies_goal = numins;
3998 auto *context = world_impl.context();
3999 for (
int i = 0; i < context->nb_vp; i++) {
4000 nbthreads += context->virtual_processes[i]->nb_cores;
4003 parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t),
sizeof(
task_t),
4004 offsetof(parsec_task_t, mempool_owner), nbthreads);
4013 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4014 typename priomapT = ttg::detail::default_priomap<keyT>>
4015 TT(
const std::string &name,
const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4018 std::forward<priomapT>(priomap)) {}
4020 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4021 typename priomapT = ttg::detail::default_priomap<keyT>>
4023 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
ttg::World world,
4024 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4025 :
TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4026 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4027 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4029 if constexpr (numinedges > 0) {
4030 register_input_callbacks(std::make_index_sequence<numinedges>{});
4033 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4034 typename priomapT = ttg::detail::default_priomap<keyT>>
4036 const std::vector<std::string> &innames,
const std::vector<std::string> &outnames,
4039 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4043 if(
nullptr !=
self.name ) {
4044 free((
void*)
self.name);
4045 self.name =
nullptr;
4048 for (std::size_t i = 0; i < numins; ++i) {
4049 if (inpute_reducers_taskclass[i] !=
nullptr) {
4050 std::free(inpute_reducers_taskclass[i]);
4051 inpute_reducers_taskclass[i] =
nullptr;
4059 ttT *op = (
ttT *)cb_data;
4060 if constexpr (!ttg::meta::is_void_v<keyT>) {
4061 std::cout <<
"Left over task " << op->
get_name() <<
" " << task->
key << std::endl;
4063 std::cout <<
"Left over task " << op->
get_name() << std::endl;
4081 parsec_mempool_destruct(&mempools);
4084 free((__parsec_chore_t *)
self.incarnations);
4085 for (
int i = 0; i < MAX_PARAM_COUNT; i++) {
4086 if (NULL !=
self.
in[i]) {
4087 free(
self.
in[i]->name);
4089 self.in[i] =
nullptr;
4091 if (NULL !=
self.
out[i]) {
4092 free(
self.
out[i]->name);
4094 self.out[i] =
nullptr;
4097 world.
impl().deregister_op(
this);
4107 template <std::
size_t i,
typename Reducer>
4110 std::get<i>(input_reducers) = reducer;
4112 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4113 if (
nullptr == tc) {
4114 tc = (parsec_task_class_t *)std::calloc(1,
sizeof(*tc));
4115 inpute_reducers_taskclass[i] = tc;
4117 tc->name = strdup((
get_name() + std::string(
" reducer ") + std::to_string(i)).c_str());
4119 tc->nb_parameters = 0;
4121 tc->nb_flows = numflows;
4123 auto &world_impl = world.
impl();
4125 if( world_impl.profiling() ) {
4127 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4129 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4140 tc->make_key = make_key;
4141 tc->key_functions = &tasks_hash_fcts;
4142 tc->task_snprintf = parsec_ttg_task_snprintf;
4144 #if defined(PARSEC_PROF_TRACE)
4145 tc->profile_info = &parsec_ttg_task_info;
4148 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes,
static_cast<decltype(world_impl.taskpool()-
>nb_task_classes)>(
self.task_class_id+1));
4153 self.incarnations = (__parsec_chore_t *)malloc(3 *
sizeof(__parsec_chore_t));
4154 ((__parsec_chore_t *)
self.incarnations)[0].type = PARSEC_DEV_CUDA;
4155 ((__parsec_chore_t *)
self.incarnations)[0].evaluate = NULL;
4157 ((__parsec_chore_t *)
self.incarnations)[1].type = PARSEC_DEV_CPU;
4158 ((__parsec_chore_t *)
self.incarnations)[1].evaluate = NULL;
4159 ((__parsec_chore_t *)
self.incarnations)[1].hook =
detail::hook;
4160 ((__parsec_chore_t *)
self.incarnations)[2].type = PARSEC_DEV_NONE;
4161 ((__parsec_chore_t *)
self.incarnations)[2].evaluate = NULL;
4162 ((__parsec_chore_t *)
self.incarnations)[2].hook = NULL;
4166 tc->incarnations = (__parsec_chore_t *)malloc(2 *
sizeof(__parsec_chore_t));
4167 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4168 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4169 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4170 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4171 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4172 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4176 tc->release_task = &parsec_release_task_to_mempool;
4177 tc->complete_execution = NULL;
4188 template <std::
size_t i,
typename Reducer>
4190 set_input_reducer<i>(std::forward<Reducer>(reducer));
4191 set_static_argstream_size<i>(
size);
4196 template <std::
size_t i>
4197 std::tuple_element_t<i, input_terminals_type> *
in() {
4198 return &std::get<i>(input_terminals);
4203 template <std::
size_t i>
4204 std::tuple_element_t<i, output_terminalsT> *
out() {
4205 return &std::get<i>(output_terminals);
4209 template <
typename Key = keyT>
4210 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4213 if constexpr(!std::is_same_v<Key, key_type>) {
4218 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4220 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4221 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4226 template <
typename Key = keyT>
4227 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4231 set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4233 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4234 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4238 template <
typename Key = keyT>
4239 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4243 if constexpr(!std::is_same_v<Key, key_type>) {
4248 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4249 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4254 template <
typename Key = keyT>
4255 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke() {
4258 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4259 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4264 if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4271 template<
typename Key,
typename Arg,
typename... Args, std::size_t I, std::size_t... Is>
4272 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key, Arg&& arg, Args&&... args) {
4273 using arg_type = std::decay_t<Arg>;
4274 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4279 copy->reset_readers();
4281 set_arg_impl<I>(key, val, copy);
4283 if constexpr (std::is_rvalue_reference_v<Arg>) {
4287 }
else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4288 set_arg<I>(key, std::forward<Arg>(arg));
4290 if constexpr (
sizeof...(Is) > 0) {
4292 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4298 template <
typename Key = keyT,
typename Arg,
typename... Args>
4299 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>,
void>
invoke(
4300 const Key &key, Arg&& arg, Args&&... args) {
4301 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4302 "Number of arguments to invoke must match the number of task inputs.");
4305 invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4306 std::forward<Arg>(arg), std::forward<Args>(args)...);
4309 using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4310 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4314 m_defer_writer = value;
4318 return m_defer_writer;
4323 world.
impl().register_tt_profiling(
this);
4333 template <
typename Keymap>
4344 template <
typename Priomap>
4346 priomap = std::forward<Priomap>(pm);
4354 template<
typename Devicemap>
4359 devicemap = std::forward<Devicemap>(dm);
4362 devicemap = [=](
const keyT& key) {
4370 throw std::runtime_error(
"Unknown device type!");
4383 template<
typename Constra
int>
4385 std::size_t cid = constraints_check.size();
4386 if constexpr(ttg::meta::is_void_v<keyT>) {
4388 constraints_check.push_back([c,
this](){
return c->check(
this); });
4389 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(
this);
return true; });
4391 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4392 constraints_check.push_back([c,
this](
const keyT& key){
return c->check(key,
this); });
4393 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(key,
this);
return true; });
4399 template<
typename Constra
int>
4402 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4408 template<
typename Constra
int,
typename Mapper>
4410 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4411 std::size_t cid = constraints_check.size();
4412 if constexpr(ttg::meta::is_void_v<keyT>) {
4414 constraints_check.push_back([map, c,
this](){
return c->check(map(),
this); });
4415 constraints_complete.push_back([map, c,
this](){ c->complete(map(),
this);
return true; });
4417 c->add_listener([
this, cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4418 constraints_check.push_back([map, c,
this](
const keyT& key){
return c->check(key, map(key),
this); });
4419 constraints_complete.push_back([map, c,
this](
const keyT& key){ c->complete(key, map(key),
this);
return true; });
4426 template<
typename Constra
int,
typename Mapper>
4429 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4435 MPI_Comm_rank(MPI_COMM_WORLD, &
rank);
4438 auto &world_impl = world.
impl();
4442 auto tp = world_impl.taskpool();
4448 std::vector<static_set_arg_fct_arg_t> tmp;
4449 for (
auto it = se.first; it != se.second;) {
4451 tmp.push_back(it->second);
4456 for (
auto it : tmp) {
4459 std::get<1>(it),
", ", std::get<2>(it),
")");
4460 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4461 std::get<0>(it), NULL);
4463 free(std::get<1>(it));
4486 bool do_release =
true;
4492 : copy_to_remove(h.copy_to_remove)
4494 h.copy_to_remove =
nullptr;
4500 std::swap(copy_to_remove, h.copy_to_remove);
4505 if (
nullptr != copy_to_remove) {
4513 template <
typename Value>
4514 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4515 constexpr
auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4516 using value_type = std::remove_reference_t<Value>;
4517 static_assert(value_is_rvref ||
4518 std::is_copy_constructible_v<std::decay_t<Value>>,
4519 "Data sent without being moved must be copy-constructible!");
4522 if (
nullptr == caller) {
4523 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4528 value_type *value_ptr = &value;
4529 if (
nullptr == copy) {
4537 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4538 copy_to_remove = copy;
4540 if constexpr (value_is_rvref) {
4548 if constexpr (value_is_rvref)
4549 return std::move(*value_ptr);
4554 template<
typename Value>
4557 if (
nullptr == caller) {
4558 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4562 if (
nullptr == copy) {
4567 copy_to_remove = copy;
4579 template <
typename Value>
4582 if (
nullptr == caller) {
4583 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4587 const Value *value_ptr = &value;
4588 if (
nullptr == copy) {
4596 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4597 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
const std::string & get_name() const
Gets the name of this operation.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
void print_incomplete_tasks()
void add_constraint(Constraint c, Mapper &&map)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::World get_world() const override final
static constexpr int numinvals
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
void set_priomap(Priomap &&pm)
void set_input_reducer(Reducer &&reducer)
std::tuple_element_t< i, input_terminals_type > * in()
void set_devicemap(Devicemap &&dm)
void set_keymap(Keymap &&km)
keymap setter
void add_constraint(std::shared_ptr< Constraint > c)
static constexpr bool derived_has_cuda_op()
void finalize_argstream_from_msg(void *data, std::size_t size)
void set_input_reducer(Reducer &&reducer, std::size_t size)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
bool check_constraints(task_t *task)
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
task_t * create_new_task(const Key &key)
output_terminalsT output_terminals_type
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
static auto & get(InTuple &&intuple)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
void set_defer_writer(bool value)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
virtual void release() override
bool get_defer_writer(bool value)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
static void ht_iter_cb(void *item, void *cb_data)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
parsec_thread_mempool_t * get_task_mempool(void)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
const auto & get_output_terminals() const
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
static constexpr bool derived_has_level_zero_op()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
decltype(keymap) const & get_keymap() const
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
void set_static_argstream_size(std::size_t size)
static constexpr const ttg::Runtime runtime
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
static resultT get(InTuple &&intuple)
void register_static_op_function(void)
void add_constraint(Constraint &&c)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
void argstream_set_size_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
void set_arg_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
void make_executable() override
Marks this executable.
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
static constexpr bool derived_has_hip_op()
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
decltype(priomap) const & get_priomap() const
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
std::tuple_element_t< i, output_terminalsT > * out()
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
static constexpr bool derived_has_device_op()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
void get_from_pull_msg(void *data, std::size_t size)
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
actual_input_tuple_type input_args_type
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
WorldImpl & operator=(const WorldImpl &other)=delete
const ttg::Edge & ctl_edge() const
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
auto * execution_stream()
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
bool complete_transfer(void)
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
void set_current(int device, Stream stream)
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
bool all_devices_peer_access
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
std::size_t max_inline_size
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
const parsec_symbol_t parsec_taskclass_param1
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
const parsec_symbol_t parsec_taskclass_param2
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
ttg_data_copy_t * create_new_datacopy(Value &&value)
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
void release_data_copy(ttg_data_copy_t *copy)
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
const parsec_symbol_t parsec_taskclass_param3
const parsec_symbol_t parsec_taskclass_param0
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
int rank(World world=default_execution_context())
ttg::World & get_default_world()
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(value_copy_handler &&h)
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
value_copy_handler & operator=(const value_copy_handler &h)=delete
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t task_constraint_table
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
ttg_parsec_data_flags data_flags
parsec_hash_table_item_t tt_ht_item
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
static void release_task(parsec_ttg_task_base_t *task_base)
static constexpr int mutable_tag
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
virtual void * get_ptr()=0
void transfer_ownership(int access, int device=0)
void inc_current_version()
void foreach_parsec_data(Fn &&fn)
iovec_iterator iovec_begin()
void set_next_task(parsec_task_t *task)
iovec_iterator iovec_end()
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER