3#ifndef PARSEC_TTG_H_INCLUDED
4#define PARSEC_TTG_H_INCLUDED
7#if !defined(TTG_IMPL_NAME)
8#define TTG_USE_PARSEC 1
14#define TTG_PARSEC_DEFER_WRITER false
16#include "ttg/config.h"
58#include <experimental/type_traits>
73#if defined(TTG_HAVE_MPI)
75#if defined(TTG_HAVE_MPIEXT)
82#include <parsec/class/parsec_hash_table.h>
83#include <parsec/data_internal.h>
84#include <parsec/execution_stream.h>
85#include <parsec/interfaces/interface.h>
86#include <parsec/mca/device/device.h>
87#include <parsec/parsec_comm_engine.h>
88#include <parsec/parsec_internal.h>
89#include <parsec/scheduling.h>
90#include <parsec/remote_dep.h>
91#include <parsec/utils/mca_param.h>
93#ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
94#include <parsec/mca/device/cuda/device_cuda.h>
96#ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
97#include <parsec/mca/device/hip/device_hip.h>
99#ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
100#include <parsec/mca/device/level_zero/device_level_zero.h>
103#include <parsec/mca/device/device_gpu.h>
104#if defined(PARSEC_PROF_TRACE)
105#include <parsec/profiling.h>
106#undef PARSEC_TTG_PROFILE_BACKEND
107#if defined(PARSEC_PROF_GRAPHER)
108#include <parsec/parsec_prof_grapher.h>
114#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
115#include <unordered_set>
126#undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
150 uint64_t
op_id = std::numeric_limits<uint64_t>::max();
171 static void unregister_parsec_tags(
void *_);
184 uint32_t taskpool_id,
189 :
tt_id(fn_id, taskpool_id,
tt_id, param_id, sender, num_keys)
195 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag,
void *data,
long unsigned int size,
196 int src_rank,
void *obj) {
198 parsec_taskpool_t *tp = NULL;
200 uint64_t op_id = msg->
op_id;
204 if (PARSEC_TERM_TP_NOT_READY != tp->tdm.module->taskpool_state(tp)) {
208 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
209 static_set_arg_fct = op_pair.first;
210 static_set_arg_fct(data, size, op_pair.second);
211 tp->tdm.module->incoming_message_end(tp, NULL);
213 }
catch (
const std::out_of_range &e)
216 auto data_cpy = std::make_unique_for_overwrite<std::byte[]>(size);
217 memcpy(data_cpy.get(), data, size);
219 ", ", op_id,
", ", data_cpy,
", ", size,
")");
220 delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, std::move(data_cpy), size)));
225 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
226 int src,
void *cb_data);
229 static bool im =
false;
240 bool _task_profiling;
242 mpi_space_support = {
true,
false,
false};
244 int query_comm_size() {
246 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
250 int query_comm_rank() {
252 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
256 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine,
void *user_data)
262 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine,
void *user_data)
269#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
270 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
271 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
272 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
275 WorldImpl(
int *argc,
char **argv[],
int ncores, parsec_context_t *c =
nullptr)
278 , own_ctx(c == nullptr)
279#if defined(PARSEC_PROF_TRACE)
280 , profiling_array(nullptr)
281 , profiling_array_size(0)
283 , _dag_profiling(false)
284 , _task_profiling(false)
287 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
291#
if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
292 || MPIX_Query_cuda_support()
299#if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
300 || MPIX_Query_hip_support()
306#if defined(PARSEC_PROF_TRACE)
307 if(parsec_profile_enabled) {
309#if defined(PARSEC_TTG_PROFILE_BACKEND)
310 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_SET_ARG_IMPL",
"fill:000000", 0, NULL,
311 (
int*)&parsec_ttg_profile_backend_set_arg_start,
312 (
int*)&parsec_ttg_profile_backend_set_arg_end);
313 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_BCAST_ARG_IMPL",
"fill:000000", 0, NULL,
314 (
int*)&parsec_ttg_profile_backend_bcast_arg_start,
315 (
int*)&parsec_ttg_profile_backend_bcast_arg_end);
316 parsec_profiling_add_dictionary_keyword(
"PARSEC_TTG_DATACOPY",
"fill:000000",
317 sizeof(
size_t),
"size{int64_t}",
318 (
int*)&parsec_ttg_profile_backend_allocate_datacopy,
319 (
int*)&parsec_ttg_profile_backend_free_datacopy);
324#ifdef PARSEC_PROF_GRAPHER
327 dot_param_idx = parsec_mca_param_find(
"profile", NULL,
"dot");
329 if (dot_param_idx != PARSEC_ERROR) {
331 parsec_mca_param_lookup_string(dot_param_idx, &filename);
336 if( NULL != parsec_ce.tag_register) {
350 assert(
nullptr == tpool);
351 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
352 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
354 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
355 tpool->taskpool_name = strdup(
"TTG Taskpool");
356 parsec_taskpool_reserve_id(tpool);
358 tpool->devices_index_mask = 0;
359 for(
int i = 0; i < (int)parsec_nb_devices; i++) {
360 parsec_device_module_t *device = parsec_mca_device_get(i);
361 if( NULL == device )
continue;
362 tpool->devices_index_mask |= (1 << device->device_index);
365#ifdef TTG_USE_USER_TERMDET
366 parsec_termdet_open_module(tpool,
"user_trigger");
368 parsec_termdet_open_dyn_module(tpool);
376 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
379#if defined(PARSEC_PROF_TRACE)
380 tpool->profiling_array = profiling_array;
389 parsec_taskpool_started =
false;
409 MPI_Comm
comm()
const {
return MPI_COMM_WORLD; }
412 if (!parsec_taskpool_started) {
413 parsec_enqueue(ctx, tpool);
414 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
415 tpool->tdm.module->taskpool_ready(tpool);
416 [[maybe_unused]]
auto ret = parsec_context_start(ctx);
418 parsec_taskpool_started =
true;
423#if defined(PARSEC_PROF_TRACE)
427 tpool->profiling_array =
nullptr;
429 assert(NULL != tpool->tdm.monitor);
430 tpool->tdm.module->unmonitor_taskpool(tpool);
431 parsec_taskpool_free(tpool);
437 if (parsec_taskpool_started) {
439 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
440 ttg::trace(
"ttg_parsec(", this->
rank(),
"): final waiting for completion");
442 parsec_context_wait(ctx);
444 parsec_taskpool_wait(tpool);
450 unregister_parsec_tags(
nullptr);
452 parsec_context_at_fini(unregister_parsec_tags,
nullptr);
454#if defined(PARSEC_PROF_TRACE)
455 if(
nullptr != profiling_array) {
456 free(profiling_array);
457 profiling_array =
nullptr;
458 profiling_array_size = 0;
461 if (own_ctx) parsec_fini(&ctx);
477 virtual void dag_on(
const std::string &filename)
override {
478#if defined(PARSEC_PROF_GRAPHER)
479 if(!_dag_profiling) {
481 size_t len = strlen(filename.c_str())+32;
482 char ext_filename[len];
483 snprintf(ext_filename, len,
"%s-%d.dot", filename.c_str(),
rank());
484 parsec_prof_grapher_init(ctx, ext_filename);
485 _dag_profiling =
true;
488 ttg::print(
"Error: requested to create '", filename,
"' to create a DAG of tasks,\n"
489 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
494#if defined(PARSEC_PROF_GRAPHER)
496 parsec_prof_grapher_fini();
497 _dag_profiling =
false;
503#if defined(PARSEC_PROF_TRACE)
504 _task_profiling =
false;
509#if defined(PARSEC_PROF_TRACE)
510 _task_profiling =
true;
514 virtual bool profiling()
override {
return _task_profiling; }
517 return mpi_space_support[
static_cast<std::size_t
>(space)];
521#ifdef TTG_USE_USER_TERMDET
522 if(parsec_taskpool_started) {
524 parsec_taskpool_started =
false;
529 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
532#if defined(PARSEC_PROF_TRACE)
533 std::stringstream ss;
534 build_composite_name_rec(t->
ttg_ptr(), ss);
541#if defined(PARSEC_PROF_TRACE)
542 void build_composite_name_rec(
const ttg::TTBase *t, std::stringstream &ss) {
545 build_composite_name_rec(t->
ttg_ptr(), ss);
549 void register_new_profiling_event(
const char *name,
int position) {
550 if(2*position >= profiling_array_size) {
551 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
552 profiling_array = (
int*)realloc((
void*)profiling_array,
553 new_profiling_array_size *
sizeof(int));
554 memset((
void*)&profiling_array[profiling_array_size], 0,
sizeof(
int)*(new_profiling_array_size - profiling_array_size));
555 profiling_array_size = new_profiling_array_size;
556 tpool->profiling_array = profiling_array;
559 assert(0 == tpool->profiling_array[2*position]);
560 assert(0 == tpool->profiling_array[2*position+1]);
564 parsec_profiling_add_dictionary_keyword(name,
"fill:000000", 64,
"key{char[64]}",
565 (
int*)&tpool->profiling_array[2*position],
566 (
int*)&tpool->profiling_array[2*position+1]);
572 if (!parsec_taskpool_started) {
573 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool has not been started, fence is a simple MPI_Barrier");
577 ttg::trace(
"ttg_parsec::(",
rank,
"): parsec taskpool is ready for completion");
579 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
581 parsec_taskpool_wait(tpool);
594 parsec_context_t *ctx =
nullptr;
595 bool own_ctx =
false;
596 parsec_taskpool_t *tpool =
nullptr;
597 bool parsec_taskpool_started =
false;
598#if defined(PARSEC_PROF_TRACE)
599 int *profiling_array;
600 std::size_t profiling_array_size;
604 static void unregister_parsec_tags(
void *_pidx)
606 if(NULL != parsec_ce.tag_unregister) {
614 const parsec_symbol_t parsec_taskclass_param0 = {
615 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
622 const parsec_symbol_t parsec_taskclass_param1 = {
623 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
630 const parsec_symbol_t parsec_taskclass_param2 = {
631 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
638 const parsec_symbol_t parsec_taskclass_param3 = {
639 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
647 inline ttg_data_copy_t *find_copy_in_task(parsec_ttg_task_base_t *task,
const void *ptr) {
648 ttg_data_copy_t *res =
nullptr;
649 if (task ==
nullptr || ptr ==
nullptr) {
652 for (
int i = 0; i < task->data_count; ++i) {
653 auto copy =
static_cast<ttg_data_copy_t *
>(task->copies[i]);
654 if (NULL != copy && copy->get_ptr() == ptr) {
662 inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task,
const void *ptr) {
664 if (task ==
nullptr || ptr ==
nullptr) {
667 for (i = 0; i < task->data_count; ++i) {
668 auto copy =
static_cast<ttg_data_copy_t *
>(task->copies[i]);
669 if (NULL != copy && copy->get_ptr() == ptr) {
676 inline bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task) {
677 if (task ==
nullptr || copy ==
nullptr) {
681 if (MAX_PARAM_COUNT < task->data_count) {
682 throw std::logic_error(
"Too many data copies, check MAX_PARAM_COUNT!");
685 task->copies[task->data_count] = copy;
690 inline void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task) {
693 for (i = task->data_count-1; i >= 0; --i) {
694 if (copy == task->copies[i]) {
700 for (; i < task->data_count - 1; ++i) {
701 task->copies[i] = task->copies[i + 1];
704 task->copies[i] =
nullptr;
708#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
709#warning "ttg::PaRSEC enables data copy tracking"
710 static std::unordered_set<ttg_data_copy_t *> pending_copies;
711 static std::mutex pending_copies_mutex;
713#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
714 static int64_t parsec_ttg_data_copy_uid = 0;
717 template <
typename Value>
719 using value_type = std::decay_t<Value>;
721 if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
722 std::is_constructible_v<value_type,
decltype(value)>) {
723 copy =
new value_type(std::forward<Value>(value));
724 }
else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>,
decltype(value)>) {
728 throw std::logic_error(
"Trying to copy-construct data that is not copy-constructible!");
730#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
733 copy->size =
sizeof(Value);
734 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
736 static_cast<uint64_t
>(copy->uid),
737 PROFILE_OBJECT_ID_NULL, ©->size,
738 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
741#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
743 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
744 auto rc = pending_copies.insert(copy);
745 assert(std::get<1>(rc));
752 template <std::size_t... IS,
typename Key = keyT>
754 int junk[] = {0, (invoke_pull_terminal<IS>(
755 std::get<IS>(input_terminals), key, task),
762 inline void transfer_ownership_impl(T&& arg,
int device) {
763 if constexpr(!std::is_const_v<std::remove_reference_t<T>>) {
765 parsec_data_transfer_ownership_to_copy(data, device, PARSEC_FLOW_ACCESS_RW);
767 data->device_copies[0]->version++;
772 template<
typename TT, std::size_t... Is>
773 inline void transfer_ownership(parsec_ttg_task_t<TT> *me,
int device, std::index_sequence<Is...>) {
776 (transfer_ownership_impl(
777 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>
> *>(
778 me->copies[Is]->get_ptr()), device), 0)...};
782 template<
typename TT>
783 inline parsec_hook_return_t hook(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
784 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
785 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
786 transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
788 return me->template invoke_op<ttg::ExecutionSpace::Host>();
791 template<
typename TT>
792 inline parsec_hook_return_t hook_cuda(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
794 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
795 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
797 std::cerr <<
"CUDA hook called without having a CUDA op!" << std::endl;
798 return PARSEC_HOOK_RETURN_ERROR;
802 template<
typename TT>
803 inline parsec_hook_return_t hook_hip(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
805 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
806 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
808 std::cerr <<
"HIP hook called without having a HIP op!" << std::endl;
809 return PARSEC_HOOK_RETURN_ERROR;
813 template<
typename TT>
814 inline parsec_hook_return_t hook_level_zero(
struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
816 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
817 return me->template invoke_op<ttg::ExecutionSpace::L0>();
819 std::cerr <<
"L0 hook called without having a L0 op!" << std::endl;
820 return PARSEC_HOOK_RETURN_ERROR;
825 template<
typename TT>
826 inline parsec_hook_return_t evaluate_cuda(
const parsec_task_t *parsec_task) {
828 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
829 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
831 return PARSEC_HOOK_RETURN_NEXT;
835 template<
typename TT>
836 inline parsec_hook_return_t evaluate_hip(
const parsec_task_t *parsec_task) {
838 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
839 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
841 return PARSEC_HOOK_RETURN_NEXT;
845 template<
typename TT>
846 inline parsec_hook_return_t evaluate_level_zero(
const parsec_task_t *parsec_task) {
848 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
849 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
851 return PARSEC_HOOK_RETURN_NEXT;
856 template <
typename KeyT,
typename ActivationCallbackT>
857 class rma_delayed_activate {
858 std::vector<KeyT> _keylist;
859 std::atomic<int> _outstanding_transfers;
860 ActivationCallbackT _cb;
861 detail::ttg_data_copy_t *_copy;
864 rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy,
int num_transfers, ActivationCallbackT cb)
865 : _keylist(
std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
867 bool complete_transfer(
void) {
868 assert(_outstanding_transfers > 0);
869 int left = --_outstanding_transfers;
871 _cb(std::move(_keylist), _copy);
878 template <
typename ActivationT>
879 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
880 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl,
size_t size,
int remote,
882 parsec_ce.mem_unregister(&lreg);
883 ActivationT *activation =
static_cast<ActivationT *
>(cb_data);
884 if (activation->complete_transfer()) {
887 return PARSEC_SUCCESS;
890 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
891 int src,
void *cb_data) {
892 std::intptr_t *fn_ptr =
static_cast<std::intptr_t *
>(msg);
893 std::function<void(
void)> *fn =
reinterpret_cast<std::function<
void(
void)
> *>(*fn_ptr);
896 return PARSEC_SUCCESS;
899 template <
typename FuncT>
900 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag,
void *msg,
size_t msg_size,
901 int src,
void *cb_data) {
902 std::intptr_t *iptr =
static_cast<std::intptr_t *
>(msg);
903 FuncT *fn_ptr =
reinterpret_cast<FuncT *
>(*iptr);
906 return PARSEC_SUCCESS;
909 inline void release_data_copy(ttg_data_copy_t *copy) {
910 if (copy->is_mutable() &&
nullptr == copy->get_next_task()) {
913 copy->reset_readers();
916 int32_t readers = copy->num_readers();
919 readers = copy->decrement_readers();
920 }
else if (readers == 1) {
922 readers = copy->decrement_readers<
false>();
926 if (1 == readers || readers == copy->mutable_tag) {
927 std::atomic_thread_fence(std::memory_order_acquire);
928 if (
nullptr != copy->get_next_task()) {
933 parsec_task_t *next_task = copy->get_next_task();
934 copy->set_next_task(
nullptr);
935 parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
936 copy->mark_mutable();
937 deferred_op->release_task();
938 }
else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
940#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
942 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
943 size_t rc = pending_copies.erase(copy);
947#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
951 static_cast<uint64_t
>(copy->uid),
952 PROFILE_OBJECT_ID_NULL, ©->
size,
953 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
961 template <
typename Value>
962 inline ttg_data_copy_t *register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task,
bool readonly) {
963 ttg_data_copy_t *copy_res = copy_in;
964 bool replace =
false;
966 assert(readers != 0);
968 if (readonly && !copy_in->is_mutable()) {
970 readers = copy_in->increment_readers();
973 if (readers == copy_in->mutable_tag) {
974 if (copy_res->get_next_task() !=
nullptr) {
976 parsec_ttg_task_base_t *next_task =
reinterpret_cast<parsec_ttg_task_base_t *
>(copy_res->get_next_task());
977 if (next_task->defer_writer) {
992 }
else if (!readonly) {
1008 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) ||
1009 ((
nullptr != task) && task->defer_writer);
1011 if (1 == copy_in->num_readers() && !defer_writer) {
1016 assert(
nullptr == copy_in->get_next_task());
1017 copy_in->set_next_task(&task->parsec_task);
1018 std::atomic_thread_fence(std::memory_order_release);
1019 copy_in->mark_mutable();
1021 if (defer_writer &&
nullptr == copy_in->get_next_task()) {
1023 copy_res->set_next_task(&task->parsec_task);
1024 task->defer_writer =
true;
1032 if (NULL == copy_res) {
1034 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1036 if (replace &&
nullptr != copy_in->get_next_task()) {
1038 parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1039 new_copy->mark_mutable();
1041 for (
int i = 0; i < deferred_op->data_count; ++i) {
1042 if (deferred_op->copies[i] == copy_in) {
1043 deferred_op->copies[i] = new_copy;
1047 copy_in->set_next_task(
nullptr);
1048 deferred_op->release_task();
1049 copy_in->reset_readers();
1050 copy_in->increment_readers<
false>();
1054 new_copy->mark_mutable();
1056 copy_res = new_copy;
1060 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<Value>).name() +
" but the type is not copyable");
1068 inline void ttg_initialize(
int argc,
char **argv,
int num_threads, parsec_context_t *ctx) {
1069 if (
detail::initialized_mpi())
throw std::runtime_error(
"ttg_parsec::ttg_initialize: can only be called once");
1072 int mpi_initialized;
1073 MPI_Initialized(&mpi_initialized);
1074 if (!mpi_initialized) {
1076 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1078 throw std::runtime_error(
"ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1087 ttg::detail::set_default_world(std::move(world));
1091 for (
int i = 0; i < parsec_nb_devices; ++i) {
1092 bool is_gpu = parsec_mca_device_is_gpu(i);
1096 throw std::runtime_error(
"PaRSEC: Found non-GPU device in GPU ID range!");
1101 const char* ttg_max_inline_cstr = std::getenv(
"TTG_MAX_INLINE");
1102 if (
nullptr != ttg_max_inline_cstr) {
1103 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1109 bool all_peer_access =
true;
1111 for (
int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1112 parsec_device_module_t *idevice = parsec_mca_device_get(i);
1113 if (PARSEC_DEV_IS_GPU(idevice->type)) {
1114 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1115 for (
int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1117 parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1118 if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1119 all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true :
false;
1132 ttg::detail::set_default_world(
ttg::World{});
1134 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1143 template <
typename T>
1145 world.
impl().register_ptr(
ptr);
1148 template <
typename T>
1150 world.
impl().register_ptr(std::move(
ptr));
1154 world.
impl().register_status(status_ptr);
1157 template <
typename Callback>
1159 world.
impl().register_callback(std::forward<Callback>(callback));
1165 double result = 0.0;
1166 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.
impl().comm());
1171 MPI_Barrier(world.
impl().comm());
1176 template <
typename T>
1179 if (world.
rank() == source_rank) {
1182 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.
impl().comm());
1184 unsigned char *buf =
new unsigned char[BUFLEN];
1185 if (world.
rank() == source_rank) {
1188 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.
impl().comm());
1189 if (world.
rank() != source_rank) {
1207 template <
typename keyT,
typename output_terminalsT,
typename derivedT,
typename input_valueTs, ttg::ExecutionSpace Space>
1212 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1214 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1218 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1225 template <
typename T>
1226 using have_cuda_op_non_type_t =
decltype(T::have_cuda_op);
1228 template <
typename T>
1229 using have_hip_op_non_type_t =
decltype(T::have_hip_op);
1231 template <
typename T>
1232 using have_level_zero_op_non_type_t =
decltype(T::have_level_zero_op);
1236 static constexpr int numinedges = std::tuple_size_v<input_tuple_type>;
1237 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>;
1238 static constexpr int numouts = std::tuple_size_v<output_terminalsT>;
1239 static constexpr int numflows = std::max(numins, numouts);
1243 template<
typename DerivedT = derivedT>
1249 template<
typename DerivedT = derivedT>
1255 template<
typename DerivedT = derivedT>
1261 template<
typename DerivedT = derivedT>
1270 "Data sent from a device-capable template task must be serializable.");
1286 std::tuple_size_v<input_refs_tuple_type>;
1292 template <std::
size_t i,
typename resultT,
typename InTuple>
1294 return static_cast<resultT>(std::get<i>(std::forward<InTuple>(
intuple)));
1296 template <std::
size_t i,
typename InTuple>
1298 return std::get<i>(std::forward<InTuple>(
intuple));
1308 constexpr static const size_t task_key_offset =
sizeof(task_t);
1317 template <std::size_t...
IS>
1318 static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1319 using resultT =
decltype(set_arg_from_msg_fcts);
1322 constexpr static std::array<
void (TT::*)(
void *,
std::
size_t), numins> set_arg_from_msg_fcts =
1325 template <std::size_t...
IS>
1326 static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1327 using resultT =
decltype(set_argstream_size_from_msg_fcts);
1330 constexpr static std::array<
void (TT::*)(
void *,
std::
size_t), numins> set_argstream_size_from_msg_fcts =
1333 template <std::size_t...
IS>
1334 static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1335 using resultT =
decltype(finalize_argstream_from_msg_fcts);
1338 constexpr static std::array<
void (TT::*)(
void *,
std::
size_t), numins> finalize_argstream_from_msg_fcts =
1341 template <std::size_t...
IS>
1342 static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1343 using resultT =
decltype(get_from_pull_msg_fcts);
1346 constexpr static std::array<
void (TT::*)(
void *,
std::
size_t), numinedges> get_from_pull_msg_fcts =
1349 template<std::size_t...
IS>
1350 constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1351 using resultT =
decltype(input_is_const);
1352 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1354 constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1357 ttg::meta::detail::keymap_t<keyT> keymap;
1358 ttg::meta::detail::keymap_t<keyT> priomap;
1359 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1361 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1363 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = {
nullptr };
1364 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1365 int num_pullins = 0;
1369 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1370 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1379 template <
typename...
Args>
1380 auto op(
Args &&...args) {
1382 using return_type =
decltype(derived->op(std::forward<Args>(args)...));
1383 if constexpr (std::is_same_v<return_type,void>) {
1384 derived->op(std::forward<Args>(args)...);
1388 return derived->op(std::forward<Args>(args)...);
1392 template <std::
size_t i,
typename terminalT,
typename Key>
1393 void invoke_pull_terminal(
terminalT &
in,
const Key &key, detail::parsec_ttg_task_base_t *
task) {
1394 if (
in.is_pull_terminal) {
1395 auto owner =
in.container.owner(key);
1396 if (owner != world.rank()) {
1405 template <std::
size_t i,
typename Key>
1406 void get_pull_terminal_data_from(
const int owner,
1408 using msg_t = detail::msg_t;
1417 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
1418 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
1420 sizeof(msg_header_t) +
pos);
1423 template <std::size_t...
IS,
typename Key =
keyT>
1424 void invoke_pull_terminals(std::index_sequence<IS...>,
const Key &key, detail::parsec_ttg_task_base_t *
task) {
1426 std::get<IS>(input_terminals), key,
task),
1431 template <std::size_t...
IS>
1434 *
reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>
> *>(
1435 task->copies[
IS]->get_ptr()))...};
1438#ifdef TTG_HAVE_DEVICE
1446 task_t *
task = (task_t*)gpu_task->ec;
1448 ttg::device::Task
dev_task = ttg::device::detail::device_task_handle_type::from_address(
task->suspended_task_address);
1457 if (
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
1464 assert(
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1465 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1467#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1473#elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1479#elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1489 static_op(&
task->parsec_task);
1498 task->parsec_task.data[
i].data_out->readers = 1;
1505 if (
nullptr !=
task->suspended_task_address) {
1507 dev_task = ttg::device::detail::device_task_handle_type::from_address(
task->suspended_task_address);
1510 assert(
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1511 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1512 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1514 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ==
dev_data.state() ||
1515 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE ==
dev_data.state()) {
1533 task_t *
task = (task_t*)parsec_task;
1534 if (
task->dev_ptr->gpu_task ==
nullptr) {
1541 gpu_task->ec = parsec_task;
1542 gpu_task->task_type = 0;
1543 gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max();
1544 gpu_task->pushout = 0;
1549 gpu_task->flow[
i] = &flows[
i];
1555 .flow_datatype_mask = ~0 };
1565 task->dev_ptr->gpu_task = gpu_task;
1571 task->dev_ptr->task_class = *
task->parsec_task.task_class;
1574 static_op(parsec_task);
1582 tc.in[
i] = gpu_task->flow[
i];
1583 tc.out[
i] = gpu_task->flow[
i];
1589 if (tt->devicemap) {
1591 if constexpr (std::is_void_v<keyT>) {
1603 if (
data->owner_device == 0) {
1611 task->parsec_task.task_class = &
task->dev_ptr->task_class;
1617 std::cerr <<
"EVALUATE called on task with assigned GPU task!" << std::endl;
1629 task_t *
task = (task_t*)parsec_task;
1631 if (
nullptr ==
task->suspended_task_address) {
1637 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(
task->suspended_task_address);
1640 ttg::device::detail::device_task_promise_type&
dev_data =
dev_task.promise();
1644 if (
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
1647 if (gpu_task->pushout == 0) {
1652 if (
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1660 task->dev_ptr->device = device;
1663 switch(device->super.type) {
1665#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1676#if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1685#if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1697 ttg::print_error(
task->tt->get_name(),
" : received mismatching device type ", (
int)device->super.type,
" from PaRSEC");
1705 task_t *
task = (task_t*)parsec_task;
1706 void* suspended_task_address =
1707#ifdef TTG_HAVE_COROUTINE
1708 task->suspended_task_address;
1713 if (suspended_task_address ==
nullptr) {
1719 if (
obj->tracing()) {
1727 auto input = make_tuple_of_ref_from_array(
task, std::make_index_sequence<numinvals>{});
1732 auto input = make_tuple_of_ref_from_array(
task, std::make_index_sequence<numinvals>{});
1743#ifdef TTG_HAVE_COROUTINE
1746#ifdef TTG_HAVE_DEVICE
1748 ttg::device::Task
coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1753 task->tt->set_outputs_tls_ptr();
1755 if (
coro.completed()) {
1757 suspended_task_address =
nullptr;
1767 task->tt->set_outputs_tls_ptr();
1769 if (
ret.completed()) {
1771 suspended_task_address =
nullptr;
1786 task->suspended_task_address = suspended_task_address;
1794#ifdef TTG_HAVE_COROUTINE
1795 task->suspended_task_address = suspended_task_address;
1797 if (suspended_task_address ==
nullptr) {
1800 if (
obj->tracing()) {
1802 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : ",
task->key,
": done executing");
1804 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : done executing");
1812 task_t *
task =
static_cast<task_t*
>(parsec_task);
1814 void* suspended_task_address =
1815#ifdef TTG_HAVE_COROUTINE
1816 task->suspended_task_address;
1820 if (suspended_task_address ==
nullptr) {
1834#ifdef TTG_HAVE_COROUTINE
1838 if (
ret.completed()) {
1840 suspended_task_address =
nullptr;
1849 task->suspended_task_address = suspended_task_address;
1851 if (suspended_task_address) {
1860 template <std::
size_t i>
1862 using rtask_t = detail::reducer_task_t;
1863 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1865 constexpr const bool input_is_const = std::is_const_v<value_t>;
1867 task_t *parent_task =
static_cast<task_t*
>(
rtask->parent_task);
1875 if (
obj->tracing()) {
1877 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : ", parent_task->key,
": reducer executing");
1879 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : reducer executing");
1888 std::size_t
size = 0;
1889 assert(parent_task->streams[
i].reduce_count > 0);
1890 if (
rtask->is_first) {
1891 if (0 == (parent_task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1893 if (
obj->tracing()) {
1895 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : ", parent_task->key,
": first reducer empty");
1897 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : first reducer empty");
1913 if (
nullptr ==
item) {
1921 *
reinterpret_cast<std::decay_t<value_t> *
>(
source_copy->get_ptr()));
1927 size = ++parent_task->streams[
i].size;
1929 }
while ((c = (parent_task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1933 bool complete = (
size >= parent_task->streams[
i].goal);
1938 if (complete && c == 0) {
1939 if constexpr(input_is_const) {
1944 parent_task->remove_from_hash =
true;
1945 parent_task->release_task(parent_task);
1950 if (
obj->tracing()) {
1952 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : ", parent_task->key,
": done executing");
1954 ttg::trace(
obj->get_world().rank(),
":",
obj->get_name(),
" : done executing");
1962 template <
typename T>
1966 if constexpr (!dd_t::serialize_size_is_const) {
1969 payload_size = dd_t::payload_size(&
obj);
1975 template <
typename T>
1979 if constexpr (!dd_t::serialize_size_is_const) {
1982 pos = dd_t::pack_payload(&
obj, payload_size,
pos, bytes);
1988 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1991 switch (
hd->fn_id) {
1993 if (0 <=
hd->param_id) {
1995 assert(
hd->param_id <
obj->set_arg_from_msg_fcts.size());
1996 auto member =
obj->set_arg_from_msg_fcts[
hd->param_id];
2006 assert(
hd->param_id <
obj->set_argstream_size_from_msg_fcts.size());
2007 auto member =
obj->set_argstream_size_from_msg_fcts[
hd->param_id];
2013 assert(
hd->param_id <
obj->finalize_argstream_from_msg_fcts.size());
2014 auto member =
obj->finalize_argstream_from_msg_fcts[
hd->param_id];
2020 assert(
hd->param_id <
obj->get_from_pull_msg_fcts.size());
2021 auto member =
obj->get_from_pull_msg_fcts[
hd->param_id];
2034 int index = (
es->virtual_process->vp_id *
es->virtual_process->nb_cores +
es->th_id);
2035 return &mempools.thread_mempools[index];
2038 template <
size_t i,
typename valueT>
2052 dummy->
parsec_task.taskpool = world.impl().taskpool();
2062 if constexpr (std::is_copy_constructible_v<valueT>) {
2067 static_assert(!std::is_reference_v<valueT>);
2071 throw std::logic_error(std::string(
"TTG::PaRSEC: need to copy a datum of type") +
typeid(std::decay_t<valueT>).name() +
" but the type is not copyable");
2099 template <std::
size_t i>
2101 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2103 msg_t *
msg =
static_cast<msg_t *
>(data);
2110 int num_keys =
msg->tt_id.num_keys;
2112 auto rank = world.rank();
2113 for (
int k = 0;
k < num_keys; ++
k) {
2116 assert(keymap(key) == rank);
2117 keylist.push_back(std::move(key));
2130 using metadata_t =
decltype(
descr.get_metadata(std::declval<decvalueT>()));
2145 if (num_iovecs == 0) {
2156 bool inline_data =
msg->tt_id.inline_data;
2168 auto activation =
new detail::rma_delayed_activate(
2171 this->world.impl().decrement_inflight_msg();
2183 std::memcpy(iovec.data,
msg->bytes +
pos, iovec.num_bytes);
2184 pos += iovec.num_bytes;
2187 using ActivationT = std::decay_t<
decltype(*activation)>;
2207 world.impl().increment_inflight_msg();
2211 &detail::get_complete_cb<ActivationT>,
activation,
2222 if (inline_data || !world.impl().mpi_support(
Space)) {
2224 }
else if (!
keylist.empty() && devicemap) {
2225 device = devicemap(
keylist[0]);
2234 buffer.allocate_on(device);
2235 buffer.set_owner_device(device);
2237 }
catch (
const std::bad_alloc&) {
2238 device = device.
cycle();
2304 template <std::
size_t i>
2307 msg_t *
msg =
static_cast<msg_t *
>(data);
2311 auto rank = world.rank();
2314 assert(keymap(key) == rank);
2317 auto rank = world.rank();
2318 assert(keymap() == rank);
2323 template <std::
size_t i>
2326 auto msg =
static_cast<msg_t *
>(data);
2330 auto rank = world.rank();
2333 assert(keymap(key) == rank);
2338 auto rank = world.rank();
2339 assert(keymap() == rank);
2346 template <std::
size_t i>
2349 msg_t *
msg =
static_cast<msg_t *
>(data);
2350 auto &
in = std::get<i>(input_terminals);
2360 template <std::
size_t i,
typename Key,
typename Value>
2361 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2366 template <std::
size_t i,
typename Key = keyT,
typename Value>
2367 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2372 template <std::
size_t i,
typename Key,
typename Value>
2373 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2374 const Key &key,
const Value &value) {
2378 template <std::
size_t i,
typename Key = keyT,
typename Value>
2379 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2380 const Value &value) {
2384 template <std::
size_t i,
typename Key = keyT,
typename Value>
2385 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg_local(
2386 std::shared_ptr<const Value> &
valueptr) {
2390 template <
typename Key>
2403 priority = priomap();
2408 for (
int i = 0;
i < static_stream_goal.size(); ++
i) {
2417 template <std::
size_t i>
2444 template <std::
size_t i,
typename Key,
typename Value>
2447 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2448 constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2453 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": received value for argument : ",
i);
2458 assert(keymap(key) == world.rank());
2463 auto &
reducer = std::get<i>(input_reducers);
2465 bool remove_from_hash =
true;
2466#if defined(PARSEC_PROF_GRAPHER)
2481#if defined(PARSEC_PROF_GRAPHER)
2483 key_hash(make_key(
task->parsec_task.taskpool,
task->parsec_task.locals),
nullptr));
2486 }
else if (!
reducer && numins == (
task->in_data_count + 1)) {
2489 remove_from_hash =
false;
2499 remove_from_hash =
false;
2501#if defined(PARSEC_PROF_GRAPHER)
2503 key_hash(make_key(
task->parsec_task.taskpool,
task->parsec_task.locals),
nullptr));
2509#if defined(PARSEC_PROF_GRAPHER)
2521 .flow_index = 0, .flow_datatype_mask = ~0 };
2523 .flow_index = 0, .flow_datatype_mask = ~0 };
2534 if (
nullptr !=
copy) {
2536 copy = detail::register_data_copy<valueT>(
copy,
task, is_const);
2541 copy->mark_mutable();
2550 std::size_t c = parent_task->streams[
i].reduce_count.fetch_add(1, std::memory_order_acquire);
2563 if (
nullptr == (
copy =
task->copies[
i])) {
2572 task->streams[
i].size = 1;
2573 task->streams[
i].reduce_count.store(1, std::memory_order_relaxed);
2619 if (
nullptr !=
task->copies[
i]) {
2621 throw std::logic_error(
"bad set arg");
2636 task->remove_from_hash = remove_from_hash;
2643 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{},
task->key,
task);
2650 if (constraints_check.size() > 0) {
2664 template<
typename Key = keyT>
2669 for (std::size_t
i =
cid+1;
i < constraints_check.size();
i++) {
2670 if (!constraints_check[
i]()) {
2688 template<
typename Key = keyT>
2692 for (
auto& key : keys) {
2695 for (std::size_t
i =
cid+1;
i < constraints_check.size();
i++) {
2696 if (!constraints_check[
i](key)) {
2743 if (
count == numins) {
2771 baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{},
task->key,
task);
2777 template <std::
size_t i,
typename Key,
typename Value>
2778 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
const Key &key,
2784 template <std::
size_t i,
typename Key,
typename Value>
2785 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
void>
set_arg(
Value &&value) {
2789 template <std::
size_t i,
typename Key = keyT>
2790 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_arg() {
2795 template <std::
size_t i,
typename Key>
2796 std::enable_if_t<!ttg::meta::is_void_v<Key>,
void>
set_arg(
const Key &key) {
2800 template<
typename Value,
typename Key>
2808 bool inline_data =
false;
2816 [](std::size_t
s,
auto&
iov){ return s + iov.num_bytes; });
2834 template <std::
size_t i,
typename Key,
typename Value>
2838 using norefvalueT = std::remove_reference_t<Value>;
2841#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2842 if(world.impl().profiling()) {
2848 owner = keymap(key);
2851 if (owner == world.rank()) {
2856#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2857 if(world.impl().profiling()) {
2877 if (
nullptr ==
copy) {
2879 if (
nullptr ==
copy) {
2888 msg->tt_id.inline_data = inline_data;
2904 std::memcpy(
msg->bytes +
pos, iovec.data, iovec.num_bytes);
2905 pos += iovec.num_bytes;
2912 copy = detail::register_data_copy<decvalueT>(
copy,
nullptr,
true);
2933 std::function<
void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
2937 detail::release_data_copy(
copy);
2939 std::intptr_t
fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
2948 num_iovecs = std::distance(std::begin(
iovs), std::end(
iovs));
2969 std::tie(device, device_copy) = detail::find_device_copy(data);
2976 msg->tt_id.num_iovecs = num_iovecs;
2980 msg->tt_id.num_keys = 0;
2981 msg->tt_id.key_offset =
pos;
2985 msg->tt_id.num_keys = 1;
2989 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
2990 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
2994#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2995 if(world.impl().profiling()) {
2999#if defined(PARSEC_PROF_GRAPHER)
3011 .flow_index = 0, .flow_datatype_mask = ~0 };
3013 .flow_index = 0, .flow_datatype_mask = ~0 };
3021 template <
int i,
typename Iterator,
typename Value>
3023#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
3024 if(world.impl().profiling()) {
3034 for (
auto it = begin;
it != end; ++
it) {
3042#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
3043 if(world.impl().profiling()) {
3049 template <std::
size_t i,
typename Key,
typename Value>
3050 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3053 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3055 auto np = world.size();
3056 int rank = world.rank();
3059 [&](
const Key &key) { return keymap(key) != rank; });
3067 int rank_a = keymap(a);
3068 int rank_b = keymap(b);
3070 int pos_a = (rank_a + np - rank) % np;
3071 int pos_b = (rank_b + np - rank) % np;
3072 return pos_a < pos_b;
3093 msg->tt_id.inline_data = inline_data;
3095 std::vector<std::pair<int32_t, std::shared_ptr<void>>>
memregs;
3109 std::memcpy(
msg->bytes +
pos, iovec.data, iovec.num_bytes);
3110 pos += iovec.num_bytes;
3120 std::shared_ptr<void>{
lreg,
3160 std::tie(device, device_copy) = detail::find_device_copy(data);
3163 data->device_copies[device]->device_private},
3175 auto owner = keymap(*
it);
3176 if (owner == rank) {
3180 std::find_if_not(++
it,
keylist_sorted.end(), [&](
const Key &key) { return keymap(key) == rank; });
3204 copy = detail::register_data_copy<valueT>(
copy,
nullptr,
true);
3206 std::function<
void(
void)> *fn =
new std::function<void(void)>([=]()
mutable {
3210 detail::release_data_copy(
copy);
3212 std::intptr_t
fn_ptr{
reinterpret_cast<std::intptr_t
>(fn)};
3219 msg->tt_id.key_offset =
pos;
3228 msg->tt_id.num_keys = num_keys;
3230 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
3231 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
3247 template <
typename Key,
typename...
Ts,
size_t...
Is,
size_t...
Js>
3248 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>,
3249 std::index_sequence<Js...>,
const Key &key,
3250 const std::tuple<Ts...> &args) {
3251 static_assert(
sizeof...(Js) ==
sizeof...(
Is));
3252 constexpr size_t js[] = {
Js...};
3259 template <
typename Key,
typename...
Ts,
size_t...
Is>
3260 std::enable_if_t<ttg::meta::is_none_void_v<Key>,
void>
set_args(std::index_sequence<Is...>
is,
const Key &key,
3261 const std::tuple<Ts...> &args) {
3262 set_args(std::index_sequence_for<Ts...>{},
is, key, args);
3268 template <
typename Key =
keyT,
typename...
Ts,
size_t...
Is,
size_t...
Js>
3269 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3270 const std::tuple<Ts...> &args) {
3271 static_assert(
sizeof...(Js) ==
sizeof...(
Is));
3272 constexpr size_t js[] = {
Js...};
3279 template <
typename Key =
keyT,
typename...
Ts,
size_t...
Is>
3280 std::enable_if_t<ttg::meta::is_void_v<Key>,
void>
set_args(std::index_sequence<Is...>
is,
3281 const std::tuple<Ts...> &args) {
3282 set_args(std::index_sequence_for<Ts...>{},
is, args);
3288 template <std::
size_t i>
3290 assert(std::get<i>(input_reducers) &&
"TT::set_static_argstream_size called on nonstreaming input terminal");
3291 assert(size > 0 &&
"TT::set_static_argstream_size(key,size) called with size=0");
3293 this->
trace(world.rank(),
":",
get_name(),
": setting global stream size for terminal ",
i);
3296 if (static_stream_goal[
i] < std::numeric_limits<std::size_t>::max()) {
3298 throw std::runtime_error(
"TT::set_static_argstream_size called for a bounded stream");
3301 static_stream_goal[
i] = size;
3307 template <std::
size_t i,
typename Key>
3310 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3311 assert(size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3314 const auto owner = keymap(key);
3315 if (owner != world.rank()) {
3316 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : forwarding stream size for terminal ",
i);
3327 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
3328 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
3332 ttg::trace(world.rank(),
":",
get_name(),
":", key,
" : setting stream size to ", size,
" for terminal ",
i);
3339 world.impl().increment_created();
3341 if( world.impl().dag_profiling() ) {
3342#if defined(PARSEC_PROF_GRAPHER)
3357 task->streams[
i].reduce_count.fetch_add(1, std::memory_order_acquire);
3358 task->streams[
i].goal = size;
3359 auto c =
task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_release);
3360 if (1 == c && (
task->streams[
i].size >= size)) {
3368 template <std::
size_t i,
typename Key = keyT>
3371 assert(std::get<i>(input_reducers) &&
"TT::set_argstream_size called on nonstreaming input terminal");
3372 assert(size > 0 &&
"TT::set_argstream_size(key,size) called with size=0");
3375 const auto owner = keymap();
3376 if (owner != world.rank()) {
3386 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
3387 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
3391 ttg::trace(world.rank(),
":",
get_name(),
" : setting stream size to ", size,
" for terminal ",
i);
3398 world.impl().increment_created();
3400 if( world.impl().dag_profiling() ) {
3401#if defined(PARSEC_PROF_GRAPHER)
3416 task->streams[
i].reduce_count.fetch_add(1, std::memory_order_acquire);
3417 task->streams[
i].goal = size;
3418 auto c =
task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_release);
3419 if (1 == c && (
task->streams[
i].size >= size)) {
3426 template <std::
size_t i,
typename Key>
3429 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3432 const auto owner = keymap(key);
3433 if (owner != world.rank()) {
3434 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": forwarding stream finalize for terminal ",
i);
3444 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
3445 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
3449 ttg::trace(world.rank(),
":",
get_name(),
" : ", key,
": finalizing stream for terminal ",
i);
3456 " : error finalize called on stream that never received an input data: ",
i);
3457 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3468 task->streams[
i].reduce_count.fetch_add(1, std::memory_order_acquire);
3469 task->streams[
i].goal = 1;
3470 auto c =
task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_release);
3471 if (1 == c && (
task->streams[
i].size >= 1)) {
3478 template <std::
size_t i,
bool key_is_
void = ttg::meta::is_
void_v<keyT>>
3481 assert(std::get<i>(input_reducers) &&
"TT::finalize_argstream called on nonstreaming input terminal");
3484 const auto owner = keymap();
3485 if (owner != world.rank()) {
3486 ttg::trace(world.rank(),
":",
get_name(),
": forwarding stream finalize for terminal ",
i);
3494 tp->tdm.module->outgoing_message_start(
tp, owner,
NULL);
3495 tp->tdm.module->outgoing_message_pack(
tp, owner,
NULL,
NULL, 0);
3505 " : error finalize called on stream that never received an input data: ",
i);
3506 throw std::runtime_error(
"TT::finalize called on stream that never received an input data");
3517 task->streams[
i].reduce_count.fetch_add(1, std::memory_order_acquire);
3518 task->streams[
i].goal = 1;
3519 auto c =
task->streams[
i].reduce_count.fetch_sub(1, std::memory_order_release);
3520 if (1 == c && (
task->streams[
i].size >= 1)) {
3526 template<
typename Value>
3532 if (data->owner_device != 0) {
3536 gpu_task->flow[
flowidx] !=
nullptr &&
3545 throw std::runtime_error(
"Cannot add more than MAX_PARAM_COUNT flows to a task!");
3551 gpu_task->flow_nb_elts[
flowidx] = data->nb_elts;
3555 gpu_task->pushout |= 1<<
flowidx;
3566 template <std::
size_t i,
typename Value,
typename RemoteCheckFn>
3567 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3570 constexpr const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3577 if (
nullptr ==
copy) {
3590 auto &
reducer = std::get<i>(input_reducers);
3655 template <std::
size_t i,
typename Key,
typename Value>
3656 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3661 int rank = world.rank();
3663 [&](
const Key &key) { return keymap(key) != rank; });
3669 template <std::
size_t i,
typename Key,
typename Value>
3670 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3675 int rank = world.rank();
3676 return (keymap() != rank);
3692 TT &operator=(
const TT &
other) =
delete;
3697 template <
typename terminalT, std::
size_t i>
3699 using valueT = std::decay_t<typename terminalT::value_type>;
3700 if (
input.is_pull_terminal) {
3707 auto move_callback = [
this](
const keyT &key,
valueT &&value) {
3710 auto send_callback = [
this](
const keyT &key,
const valueT &value) {
3713 auto broadcast_callback = [
this](
const ttg::span<const keyT> &
keylist,
const valueT &value) {
3716 auto prepare_send_callback = [
this](
const ttg::span<const keyT> &
keylist,
const valueT &value) {
3721 input.set_callback(send_callback, move_callback, broadcast_callback,
3722 setsize_callback, finalize_callback, prepare_send_callback);
3731 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3742 auto send_callback = [
this](
const valueT &value) {
3743 if constexpr (std::is_copy_constructible_v<valueT>) {
3747 throw std::logic_error(std::string(
"TTG::PaRSEC: send_callback is invoked on datum of type ") +
typeid(std::decay_t<valueT>).name() +
" which is not copy constructible, std::move datum into send/broadcast statement");
3752 auto prepare_send_callback = [
this](
const valueT &value) {
3755 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3764 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3774 template <std::size_t...
IS>
3775 void register_input_callbacks(std::index_sequence<IS...>) {
3783 template <std::size_t...
IS,
typename inedgesT>
3784 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>,
inedgesT &
inedges) {
3785 int junk[] = {0, (std::get<IS>(
inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3790 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>,
outedgesT &
outedges) {
3791 int junk[] = {0, (std::get<IS>(
outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3806 template <
typename input_terminals_tupleT,
typename flowsT>
3809 std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3816 if constexpr (std::is_same_v<keyT, void>) {
3827 if constexpr (
keyT_is_Void || std::is_same_v<keyT, void>) {
3838 if constexpr (std::is_same_v<keyT, void>) {
3843 std::stringstream
iss;
3864 const task_t *
task =
reinterpret_cast<const task_t*
>(parsec_task);
3865 std::stringstream
ss;
3877#if defined(PARSEC_PROF_TRACE)
3880 const task_t *
task =
reinterpret_cast<const task_t *
>(
data);
3883 snprintf(
reinterpret_cast<char*
>(
dst), size,
"()");
3885 std::stringstream
ss;
3887 snprintf(
reinterpret_cast<char*
>(
dst), size,
"%s",
ss.str().c_str());
3899 task_t *
task = (task_t*)parsec_task;
3901#ifdef TTG_HAVE_COROUTINE
3903 if (
task->suspended_task_address) {
3905#ifdef TTG_HAVE_DEVICE
3908 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(
task->suspended_task_address);
3913 assert(
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3914 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3917 if (
dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3921 task->tt->set_outputs_tls_ptr();
3930 task->suspended_task_address =
nullptr;
3935 for (
int i = 0;
i <
task->data_count;
i++) {
3937 if (
nullptr ==
copy)
continue;
3938 detail::release_data_copy(
copy);
3939 task->copies[
i] =
nullptr;
3942 for (
auto& c :
task->tt->constraints_complete) {
3943 if constexpr(std::is_void_v<keyT>) {
3953 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
3954 typename priomapT = ttg::detail::default_priomap<keyT>>
3955 TT(
const std::string &name,
const std::vector<std::string> &
innames,
const std::vector<std::string> &
outnames,
3957 :
ttg::
TTBase(name, numinedges, numouts)
3965 if (
innames.size() != numinedges)
throw std::logic_error(
"ttg_parsec::TT: #input names != #input terminals");
3966 if (
outnames.size() != numouts)
throw std::logic_error(
"ttg_parsec::TT: #output names != #output terminals");
3971 if constexpr (numinedges == numins) {
3979 register_input_callbacks(std::make_index_sequence<numinedges>{});
3986 self.nb_parameters = 0;
3994 self.nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3996 self.nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
3999 self.params[0] = &detail::parsec_taskclass_param0;
4000 self.params[1] = &detail::parsec_taskclass_param1;
4002 self.locals[0] = &detail::parsec_taskclass_param0;
4003 self.locals[1] = &detail::parsec_taskclass_param1;
4004 self.locals[2] = &detail::parsec_taskclass_param2;
4005 self.locals[3] = &detail::parsec_taskclass_param3;
4007 self.make_key = make_key;
4008 self.key_functions = &tasks_hash_fcts;
4009 self.task_snprintf = parsec_ttg_task_snprintf;
4011#if defined(PARSEC_PROF_TRACE)
4036#if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
4059 self.complete_execution = complete_task_and_release;
4069 flow->flow_index =
i;
4070 flow->flow_datatype_mask = ~0;
4078 flow->name =
strdup((std::string(
"flow out") + std::to_string(
i)).
c_str());
4083 flow->flow_index =
i;
4084 flow->flow_datatype_mask = (1 <<
i);
4090 self.dependencies_goal = numins;
4094 for (
int i = 0;
i < context->nb_vp;
i++) {
4095 nbthreads += context->virtual_processes[
i]->nb_cores;
4108 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4109 typename priomapT = ttg::detail::default_priomap<keyT>>
4110 TT(
const std::string &name,
const std::vector<std::string> &
innames,
const std::vector<std::string> &
outnames,
4115 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4116 typename priomapT = ttg::detail::default_priomap<keyT>>
4121 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{},
inedges);
4122 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{},
outedges);
4124 if constexpr (numinedges > 0) {
4125 register_input_callbacks(std::make_index_sequence<numinedges>{});
4128 template <
typename keymapT = ttg::detail::default_keymap<keyT>,
4129 typename priomapT = ttg::detail::default_priomap<keyT>>
4131 const std::vector<std::string> &
innames,
const std::vector<std::string> &
outnames,
4138 if(
nullptr !=
self.name ) {
4140 self.name =
nullptr;
4143 for (std::size_t
i = 0;
i < numins; ++
i) {
4144 if (inpute_reducers_taskclass[
i] !=
nullptr) {
4145 std::free(inpute_reducers_taskclass[
i]);
4146 inpute_reducers_taskclass[
i] =
nullptr;
4156 std::cout <<
"Left over task " << op->
get_name() <<
" " <<
task->key << std::endl;
4158 std::cout <<
"Left over task " << op->
get_name() << std::endl;
4184 self.in[
i] =
nullptr;
4189 self.out[
i] =
nullptr;
4192 world.
impl().deregister_op(
this);
4202 template <std::
size_t i,
typename Reducer>
4205 std::get<i>(input_reducers) =
reducer;
4208 if (
nullptr ==
tc) {
4210 inpute_reducers_taskclass[
i] =
tc;
4214 tc->nb_parameters = 0;
4216 tc->nb_flows = numflows;
4222 tc->nb_parameters = (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4224 tc->nb_locals =
self.nb_parameters + (
sizeof(
void*)+
sizeof(
int)-1)/
sizeof(
int);
4227 tc->params[0] = &detail::parsec_taskclass_param0;
4228 tc->params[1] = &detail::parsec_taskclass_param1;
4230 tc->locals[0] = &detail::parsec_taskclass_param0;
4231 tc->locals[1] = &detail::parsec_taskclass_param1;
4232 tc->locals[2] = &detail::parsec_taskclass_param2;
4233 tc->locals[3] = &detail::parsec_taskclass_param3;
4235 tc->make_key = make_key;
4236 tc->key_functions = &tasks_hash_fcts;
4237 tc->task_snprintf = parsec_ttg_task_snprintf;
4239#if defined(PARSEC_PROF_TRACE)
4272 tc->complete_execution =
NULL;
4283 template <std::
size_t i,
typename Reducer>
4291 template <std::
size_t i>
4292 std::tuple_element_t<i, input_terminals_type> *
in() {
4293 return &std::get<i>(input_terminals);
4298 template <std::
size_t i>
4299 std::tuple_element_t<i, output_terminalsT> *
out() {
4300 return &std::get<i>(output_terminals);
4304 template <
typename Key = keyT>
4308 if constexpr(!std::is_same_v<Key, key_type>) {
4321 template <
typename Key = keyT>
4333 template <
typename Key = keyT>
4338 if constexpr(!std::is_same_v<Key, key_type>) {
4349 template <
typename Key = keyT>
4366 template<
typename Key,
typename Arg,
typename...
Args, std::size_t
I, std::size_t...
Is>
4367 void invoke_arglist(std::index_sequence<I, Is...>,
const Key& key,
Arg&&
arg,
Args&&... args) {
4368 using arg_type = std::decay_t<Arg>;
4374 copy->reset_readers();
4377 ttg_parsec::detail::release_data_copy(
copy);
4378 if constexpr (std::is_rvalue_reference_v<Arg>) {
4385 if constexpr (
sizeof...(Is) > 0) {
4387 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4396 static_assert(
sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4397 "Number of arguments to invoke must match the number of task inputs.");
4401 std::forward<Arg>(
arg), std::forward<Args>(args)...);
4409 m_defer_writer = value;
4413 return m_defer_writer;
4418 world.
impl().register_tt_profiling(
this);
4428 template <
typename Keymap>
4439 template <
typename Priomap>
4441 priomap = std::forward<Priomap>(
pm);
4449 template<
typename Devicemap>
4452 if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4454 devicemap = std::forward<Devicemap>(
dm);
4457 devicemap = [=](
const keyT& key) {
4465 throw std::runtime_error(
"Unknown device type!");
4478 template<
typename Constra
int>
4480 std::size_t
cid = constraints_check.size();
4483 constraints_check.push_back([c,
this](){
return c->check(
this); });
4484 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(
this);
return true; });
4486 c->add_listener([
this,
cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4487 constraints_check.push_back([c,
this](
const keyT& key){
return c->check(key,
this); });
4488 constraints_complete.push_back([c,
this](
const keyT& key){ c->complete(key,
this);
return true; });
4494 template<
typename Constra
int>
4497 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4503 template<
typename Constra
int,
typename Mapper>
4505 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4506 std::size_t
cid = constraints_check.size();
4509 constraints_check.push_back([map, c,
this](){
return c->check(map(),
this); });
4510 constraints_complete.push_back([map, c,
this](){ c->complete(map(),
this);
return true; });
4512 c->add_listener([
this,
cid](
const std::span<keyT>& keys){ this->
release_constraint(cid, keys); },
this);
4513 constraints_check.push_back([map, c,
this](
const keyT& key){
return c->check(key, map(key),
this); });
4514 constraints_complete.push_back([map, c,
this](
const keyT& key){ c->complete(key, map(key),
this);
return true; });
4521 template<
typename Constra
int,
typename Mapper>
4524 this->
add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4543 std::vector<static_set_arg_fct_arg_t>
tmp;
4544 for (
auto it =
se.first;
it !=
se.second;) {
4546 tmp.push_back(std::move(
it->second));
4551 for (
auto&
it :
tmp) {
4554 std::get<1>(
it).get(),
", ", std::get<2>(
it),
")");
4580 bool do_release =
true;
4586 : copy_to_remove(h.copy_to_remove)
4588 h.copy_to_remove =
nullptr;
4594 std::swap(copy_to_remove, h.copy_to_remove);
4599 if (
nullptr != copy_to_remove) {
4602 ttg_parsec::detail::release_data_copy(copy_to_remove);
4607 template <
typename Value>
4608 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&>
operator()(Value &&value) {
4609 constexpr auto value_is_rvref = std::is_rvalue_reference_v<
decltype(value)>;
4610 using value_type = std::remove_reference_t<Value>;
4611 static_assert(value_is_rvref ||
4612 std::is_copy_constructible_v<std::decay_t<Value>>,
4613 "Data sent without being moved must be copy-constructible!");
4616 if (
nullptr == caller) {
4617 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4621 copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4622 value_type *value_ptr = &value;
4623 if (
nullptr == copy) {
4629 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4631 value_ptr =
reinterpret_cast<value_type *
>(copy->
get_ptr());
4632 copy_to_remove = copy;
4634 if constexpr (value_is_rvref) {
4642 if constexpr (value_is_rvref)
4643 return std::move(*value_ptr);
4648 template<
typename Value>
4651 if (
nullptr == caller) {
4652 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4655 copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.
value_ref);
4656 if (
nullptr == copy) {
4659 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4661 copy_to_remove = copy;
4673 template <
typename Value>
4676 if (
nullptr == caller) {
4677 throw std::runtime_error(
"ERROR: ttg::send or ttg::broadcast called outside of a task!");
4680 copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4681 const Value *value_ptr = &value;
4682 if (
nullptr == copy) {
4688 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4690 value_ptr =
reinterpret_cast<Value *
>(copy->
get_ptr());
4691 copy_to_remove = copy;
#define TTG_OP_ASSERT_EXECUTABLE()
Edge is used to connect In and Out terminals.
A base class for all template tasks.
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
auto get_instance_id() const
virtual void make_executable()=0
Marks this executable.
void register_input_terminals(terminalsT &terms, const namesT &names)
const TTBase * ttg_ptr() const
const std::string & get_name() const
Gets the name of this operation.
void register_output_terminals(terminalsT &terms, const namesT &names)
A complete version of void.
Base class for implementation-specific Worlds.
WorldImplBase(int size, int rank)
bool is_valid(void) const
Represents a device in a specific execution space.
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
void print_incomplete_tasks()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
void add_constraint(Constraint c, Mapper &&map)
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
ttg::World get_world() const override final
static constexpr int numinvals
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
std::tuple_element_t< i, output_terminalsT > * out()
void set_priomap(Priomap &&pm)
void set_input_reducer(Reducer &&reducer)
void set_devicemap(Devicemap &&dm)
void set_keymap(Keymap &&km)
keymap setter
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
void add_constraint(std::shared_ptr< Constraint > c)
void finalize_argstream_from_msg(void *data, std::size_t size)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
std::tuple_element_t< i, input_terminals_type > * in()
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
void set_input_reducer(Reducer &&reducer, std::size_t size)
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
bool check_constraints(task_t *task)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
output_terminalsT output_terminals_type
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
void set_defer_writer(bool value)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
virtual void release() override
bool get_defer_writer(bool value)
static void ht_iter_cb(void *item, void *cb_data)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
const auto & get_output_terminals() const
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
static constexpr bool derived_has_device_op()
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
const decltype(priomap) & get_priomap() const
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
task_t * create_new_task(const Key &key)
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
void set_static_argstream_size(std::size_t size)
static constexpr const ttg::Runtime runtime
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
static resultT get(InTuple &&intuple)
static constexpr bool derived_has_cuda_op()
void register_static_op_function(void)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
void add_constraint(Constraint &&c)
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
void argstream_set_size_from_msg(void *data, std::size_t size)
void set_arg_from_msg(void *data, std::size_t size)
void make_executable() override
Marks this executable.
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
const decltype(keymap) & get_keymap() const
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
static constexpr bool derived_has_hip_op()
void copy_mark_pushout(const Value &value)
void get_from_pull_msg(void *data, std::size_t size)
parsec_thread_mempool_t * get_task_mempool(void)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
static auto & get(InTuple &&intuple)
static constexpr bool derived_has_level_zero_op()
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
actual_input_tuple_type input_args_type
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
auto * execution_stream()
virtual void execute() override
static constexpr int parsec_ttg_rma_tag()
void decrement_inflight_msg()
void increment_inflight_msg()
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl & operator=(WorldImpl &&other)=delete
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
bool mpi_support(ttg::ExecutionSpace space)
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
virtual bool profiling() override
virtual void dag_off() override
virtual void fence_impl(void) override
virtual void dag_on(const std::string &filename) override
static constexpr int parsec_ttg_tag()
virtual void final_task() override
virtual void destroy() override
const ttg::Edge & ctl_edge() const
virtual void profile_on() override
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
constexpr auto data(C &c) -> decltype(c.data())
typename make_index_sequence_t< I... >::type make_index_sequence
void deregister_world(ttg::base::WorldImplBase &world)
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
void buffer_apply(T &&t, Fn &&fn)
void set_current(int device, Stream stream)
bool all_devices_peer_access
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
std::size_t max_inline_size
void foreach_parsec_data(Value &&value, Fn &&fn)
int ttg_device_to_parsec_device(const ttg::device::Device &device)
constexpr const int PARSEC_TTG_MAX_AM_SIZE
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
ttg_data_copy_t * create_new_datacopy(Value &&value)
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
this contains PaRSEC-based TTG functionality
void ttg_fence(ttg::World world)
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
std::mutex static_map_mutex
void ttg_register_callback(ttg::World world, Callback &&callback)
ttg::Edge & ttg_ctl_edge(ttg::World world)
void make_executable_hook(ttg::World &)
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
std::tuple< int, std::unique_ptr< std::byte[]>, size_t > static_set_arg_fct_arg_t
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
ttg::World ttg_default_execution_context()
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
void ttg_execute(ttg::World world)
void ttg_sum(ttg::World world, double &value)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void > > &status_ptr)
top-level TTG namespace contains runtime-neutral functionality
ExecutionSpace
denotes task execution space
int size(World world=default_execution_context())
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
ttg::World & get_default_world()
World default_execution_context()
Accesses the default backend's default execution context.
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
bool tracing()
returns whether tracing is enabled
void trace(const T &t, const Ts &... ts)
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
value_copy_handler(value_copy_handler &&h)
const Value & operator()(const Value &value)
value_copy_handler & operator=(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
value_copy_handler(const value_copy_handler &h)=delete
value_copy_handler & operator=(value_copy_handler &&h)
value_copy_handler()=default
Computes hash values for objects of type T.
task that can be resumed after some events occur
parsec_hash_table_t task_constraint_table
parsec_hash_table_t tasks_table
parsec_gpu_task_t * gpu_task
static constexpr std::size_t max_payload_size
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
unsigned char bytes[max_payload_size]
parsec_task_t parsec_task
ttg_data_copy_t ** copies
std::array< stream_info_t, num_streams > streams
ttg_data_copy_t * copies[num_copies]
lvalue_reference_type value_ref
static void drop_all_ptr()
virtual void * get_ptr()=0
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER