ttg_data_copy.h
Go to the documentation of this file.
1 #ifndef TTG_DATA_COPY_H
2 #define TTG_DATA_COPY_H
3 
4 #include <utility>
5 #include <limits>
6 #include <vector>
7 #include <iterator>
8 #include <atomic>
9 #include <type_traits>
10 
11 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
12 #include <cuda_runtime.h>
13 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
14 
15 #include <parsec.h>
16 
18 #include "ttg/parsec/parsec-ext.h"
19 #include "ttg/util/span.h"
20 
21 
22 namespace ttg_parsec {
23 
24  namespace detail {
25 
26  // fwd-decl
27  struct ttg_data_copy_t;
28 
29  /* Wrapper managing the relationship between a ttg data copy and the parsec_data_t object */
31 
32  protected:
33  using parsec_data_ptr = std::unique_ptr<parsec_data_t, decltype(&parsec_data_destroy)>;
34 
37 
39 
40  static parsec_data_t* create_parsec_data(void *ptr, size_t size, bool sync_to_device) {
41  parsec_data_t *data = parsec_data_create_with_type(nullptr, 0, ptr, size,
42  parsec_datatype_int8_t);
43  data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED;
44  data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED;
45  // if we don't want to synchronize data to the device we set the version to 0
46  data->device_copies[0]->version = (sync_to_device) ? 1 : 0;
47  return data;
48  }
49 
50  parsec_data_t* parsec_data() {
51  return m_data.get();
52  }
53 
54  const parsec_data_t* parsec_data() const {
55  return m_data.get();
56  }
57 
58  static void delete_parsec_data(parsec_data_t *data) {
59 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
60  if (data->device_copies[0]->flags & TTG_PARSEC_DATA_FLAG_REGISTERED) {
61  // register the memory for faster access
62  cudaError_t status;
63  status = cudaHostUnregister(data->device_copies[0]->device_private);
64  assert(cudaSuccess == status);
65  data->device_copies[0]->flags ^= TTG_PARSEC_DATA_FLAG_REGISTERED;
66  }
67 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
68  assert(data->device_copies[0] != nullptr);
69  auto copy = data->device_copies[0];
70  parsec_data_copy_detach(data, data->device_copies[0], 0);
71  PARSEC_OBJ_RELEASE(copy);
72  PARSEC_OBJ_RELEASE(data);
73  }
74 
75  static void delete_null_parsec_data(parsec_data_t *) {
76  // nothing to be done, only used for nullptr
77  }
78 
79  protected:
80 
81  /* remove the data from the owning data copy */
82  void remove_from_owner();
83 
84  /* add the data to the owning data copy */
85  void reset_parsec_data(void *ptr, size_t size, bool sync_to_device);
86 
88 
90 
92 
94 
96 
98 
99  /* set a new owning data copy object */
100  void set_owner(ttg_data_copy_t& new_copy) {
101  m_ttg_copy = &new_copy;
102  }
103 
104  /* add a new copy to the data on the give device backed by ptr */
105  void add_copy(int parsec_dev, void *ptr) {
106  parsec_data_copy_t* copy = parsec_data_copy_new(m_data.get(), parsec_dev,
107  parsec_datatype_int8_t,
108  PARSEC_DATA_FLAG_PARSEC_MANAGED);
109  copy->device_private = ptr;
110  }
111  };
112 
113 
114  /* templated to break cyclic dependency with ttg_data_copy_container */
115  template<typename T = ttg_data_copy_t>
118  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
120  }
121  };
122 
123  /* special type: stores a pointer to the ttg_data_copy_t. This is necessary
124  * because ttg_data_copy_t has virtual functions so we cannot cast from parsec_data_copy_t
125  * to ttg_data_copy_t (offsetof is not supported for virtual classes).
126  * The self pointer is a back-pointer to the ttg_data_copy_t. */
128  parsec_list_item_t super;
131  : self(dc)
132  {
133  PARSEC_OBJ_CONSTRUCT(&super, parsec_list_item_t);
134  }
135  };
136 
137  /* Non-owning copy-tracking wrapper, accounting for N readers or 1 writer.
138  * Also counts external references, which are not treated as
139  * readers or writers but merely prevent the object from being
140  * destroyed once no readers/writers exist.
141  */
143 
144  /* special value assigned to parsec_data_copy_t::readers to mark the copy as
145  * mutable, i.e., a task will modify it */
146  static constexpr int mutable_tag = std::numeric_limits<int>::min();
147 
149  : ttg_data_copy_self_t(this)
150  { }
151 
153  : ttg_data_copy_self_t(this)
154  {
155  /* we allow copying but do not copy any data over from the original
156  * device copies will have to be allocated again
157  * and it's a new object to reference */
158  }
159 
161  : ttg_data_copy_self_t(this)
163  , m_readers(c.m_readers)
164  , m_refs(c.m_refs.load(std::memory_order_relaxed))
165  , m_dev_data(std::move(c.m_dev_data))
168  {
169  c.m_num_dev_data = 0;
170  c.m_readers = 0;
171  c.m_single_dev_data = nullptr;
172 
174  data->set_owner(*this);
175  });
176  }
177 
179  {
180  m_next_task = c.m_next_task;
181  c.m_next_task = nullptr;
182  m_readers = c.m_readers;
183  c.m_readers = 0;
184  m_refs.store(c.m_refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
185  c.m_refs.store(0, std::memory_order_relaxed);
186  m_dev_data = std::move(c.m_dev_data);
187  m_single_dev_data = c.m_single_dev_data;
188  c.m_single_dev_data = nullptr;
189  m_num_dev_data = c.m_num_dev_data;
190  c.m_num_dev_data = 0;
191 
192  /* move all data to the new owner */
194  data->set_owner(*this);
195  });
196  return *this;
197  }
198 
200  /* we allow copying but do not copy any data over from the original
201  * device copies will have to be allocated again
202  * and it's a new object to reference */
203 
204  return *this;
205  }
206 
207  /* mark destructor as virtual */
208  virtual ~ttg_data_copy_t() = default;
209 
210  /* Returns true if the copy is mutable */
211  bool is_mutable() const {
212  return m_readers == mutable_tag;
213  }
214 
215  /* Mark the copy as mutable */
216  void mark_mutable() {
218  }
219 
220  /* Increment the reader counter and return previous value
221  * \tparam Atomic Whether to decrement atomically. Default: true
222  */
223  template<bool Atomic = true>
225  if constexpr(Atomic) {
226  return parsec_atomic_fetch_inc_int32(&m_readers);
227 // std::atomic_ref<int32_t> a{m_readers};
228 // return a.fetch_add(1, std::memory_order_relaxed);
229  } else {
230  return m_readers++;
231  }
232  }
233 
237  void reset_readers() {
238  if (mutable_tag == m_readers) {
239  m_readers = 1;
240  }
241  }
242 
243  /* Decrement the reader counter and return previous value.
244  * \tparam Atomic Whether to decrement atomically. Default: true
245  */
246  template<bool Atomic = true>
248  if constexpr(Atomic) {
249  return parsec_atomic_fetch_dec_int32(&m_readers);
250 // std::atomic_ref<int32_t> a{m_readers};
251 // return a.fetch_sub(1, std::memory_order_relaxed);
252  } else {
253  return m_readers--;
254  }
255  }
256 
257  /* Returns the number of readers if the copy is immutable, or \c mutable_tag
258  * if the copy is mutable */
259  int num_readers() const {
260  return m_readers;
261  }
262 
263  /* Returns the pointer to the user data wrapped by the the copy object */
264  virtual void* get_ptr() = 0;
265 
266  parsec_task_t* get_next_task() const {
267  return m_next_task;
268  }
269 
270  void set_next_task(parsec_task_t* task) {
271  m_next_task = task;
272  }
273 
274  int32_t add_ref() {
275  return m_refs.fetch_add(1, std::memory_order_relaxed);
276  }
277 
278  int32_t drop_ref() {
279  return m_refs.fetch_sub(1, std::memory_order_relaxed);
280  }
281 
282  bool has_ref() {
283  return (m_refs.load(std::memory_order_relaxed) != 0);
284  }
285 
286  int32_t num_ref() const {
287  return m_refs.load(std::memory_order_relaxed);
288  }
289 
290  /* increment the version of the current copy */
292  //std::cout << "data-copy " << this << " inc_current_version " << " count " << m_num_dev_data << std::endl;
293  foreach_parsec_data([](parsec_data_t* data){
294  assert(data->device_copies[0] != nullptr);
295  data->device_copies[0]->version++;
296  });
297  }
298 
299  void transfer_ownership(int access, int device = 0) {
300  foreach_parsec_data([&](parsec_data_t* data){
301  parsec_data_transfer_ownership_to_copy(data, device, access);
302  });
303  }
304 
305  /* manage device copies owned by this object
306  * we only touch the vector if we have more than one copies to track
307  * and otherwise use the single-element member.
308  */
310 
312  switch (m_num_dev_data) {
313  case 0:
315  break;
316  case 1:
317  /* move single copy into vector and add new copy below */
318  m_dev_data.push_back(m_single_dev_data);
319  m_single_dev_data = nullptr;
320  /* fall-through */
321  default:
322  /* store in multi-copy vector */
323  m_dev_data.push_back(data);
324  break;
325  }
326  //std::cout << "data-copy " << this << " add data " << data << " count " << m_num_dev_data << std::endl;
327  m_num_dev_data++;
328  }
329 
331  //std::cout << "data-copy " << this << " remove data " << data << " count " << m_num_dev_data << std::endl;
332  if (m_num_dev_data == 0) {
333  /* this may happen if we're integrated into the object and have been moved */
334  return;
335  }
336  if (m_num_dev_data == 1) {
337  assert(m_single_dev_data == data);
338  m_single_dev_data = nullptr;
339  } else if (m_num_dev_data > 1) {
340  auto it = std::find(m_dev_data.begin(), m_dev_data.end(), data);
341  if (it != m_dev_data.end()) {
342  m_dev_data.erase(it);
343  }
344  }
345  --m_num_dev_data;
346  /* make single-entry if needed */
347  if (m_num_dev_data == 1) {
349  m_dev_data.clear();
350  }
351  }
352 
353  int num_dev_data() const {
354  return m_num_dev_data;
355  }
356 
357  template<typename Fn>
358  void foreach_wrapper(Fn&& fn) {
359  if (m_num_dev_data == 1) {
360  fn(m_single_dev_data);
361  } else if (m_num_dev_data > 1) {
362  std::for_each(m_dev_data.begin(), m_dev_data.end(), fn);
363  }
364  }
365 
366  template<typename Fn>
367  void foreach_parsec_data(Fn&& fn) {
368  if (m_num_dev_data == 1) {
371  }
372  } else if (m_num_dev_data > 1) {
373  std::for_each(m_dev_data.begin(), m_dev_data.end(),
375  if (data->parsec_data()) {
376  fn(data->parsec_data());
377  }
378  }
379  );
380  }
381  }
382 
383 
384 #if 0
385  iterator begin() {
386  switch(m_num_dev_data) {
387  // no device copies
388  case 0: return end();
389  case 1: return &m_single_dev_data;
390  default: return m_dev_data.data();
391  }
392  }
393 
394  iterator end() {
395  switch(m_num_dev_data) {
396  case 0:
397  case 1:
398  return &(m_single_dev_data) + 1;
399  default:
400  return m_dev_data.data() + m_dev_data.size();
401  }
402  }
403 #endif // 0
404 
405  using iovec_iterator = typename std::vector<ttg::iovec>::iterator;
406 
408  return m_iovecs.begin();
409  }
410 
412  return m_iovecs.end();
413  }
414 
415  void iovec_reset() {
416  m_iovecs.clear();
417  }
418 
419  void iovec_add(const ttg::iovec& iov) {
420  m_iovecs.push_back(iov);
421  }
422 
423  ttg::span<ttg::iovec> iovec_span() {
424  return ttg::span<ttg::iovec>(m_iovecs.data(), m_iovecs.size());
425  }
426 
427  std::size_t iovec_count() const {
428  return m_iovecs.size();
429  }
430 
431 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
432  int64_t size;
433  int64_t uid;
434 #endif
435  protected:
436  parsec_task_t *m_next_task = nullptr;
437  int32_t m_readers = 1;
438  std::atomic<int32_t> m_refs = 1; //< number of entities referencing this copy (TTGs, external)
439 
440  std::vector<ttg::iovec> m_iovecs;
441 
442  std::vector<ttg_parsec_data_wrapper_t*> m_dev_data; //< used if there are multiple device copies
443  // that belong to this object
444  ttg_parsec_data_wrapper_t *m_single_dev_data; //< used if there is a single device copy
445  int m_num_dev_data = 0; //< number of device copies
446  };
447 
448 
454  template<typename ValueT>
455  struct ttg_data_value_copy_t final : private ttg_data_copy_container_setter<ttg_data_copy_t>
456  , public ttg_data_copy_t {
457  using value_type = ValueT;
459 
460  template<typename T>
461  requires(std::constructible_from<ValueT, T>)
462  ttg_data_value_copy_t(T&& value)
464  , ttg_data_copy_t()
465  , m_value(std::forward<T>(value))
466  {
467  /* reset the container tracker */
468  ttg_data_copy_container() = nullptr;
469  }
470 
472  noexcept(std::is_nothrow_move_constructible_v<value_type>)
474  , ttg_data_copy_t(std::move(c))
475  , m_value(std::move(c.m_value))
476  {
477  /* reset the container tracker */
478  ttg_data_copy_container() = nullptr;
479  }
480 
482  noexcept(std::is_nothrow_copy_constructible_v<value_type>)
484  , ttg_data_copy_t(c)
485  , m_value(c.m_value)
486  {
487  /* reset the container tracker */
488  ttg_data_copy_container() = nullptr;
489  }
490 
492  noexcept(std::is_nothrow_move_assignable_v<value_type>)
493  {
494  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
495  ttg_data_copy_container() = this;
496  ttg_data_copy_t::operator=(std::move(c));
497  m_value = std::move(c.m_value);
498  /* reset the container tracker */
499  ttg_data_copy_container() = nullptr;
500  }
501 
503  noexcept(std::is_nothrow_copy_assignable_v<value_type>)
504  {
505  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
506  ttg_data_copy_container() = this;
507  ttg_data_copy_t::operator=(c);
508  m_value = c.m_value;
509  /* reset the container tracker */
510  ttg_data_copy_container() = nullptr;
511  }
512 
514  return m_value;
515  }
516 
517  /* will destruct the value */
518  virtual ~ttg_data_value_copy_t() = default;
519 
520  virtual void* get_ptr() override final {
521  return &m_value;
522  }
523  };
524 
529  inline
530  void ttg_parsec_data_wrapper_t::remove_from_owner() {
531  if (nullptr != m_ttg_copy) {
532  m_ttg_copy->remove_device_data(this);
533  m_ttg_copy = nullptr;
534  }
535  }
536 
537  inline
538  void ttg_parsec_data_wrapper_t::reset_parsec_data(void *ptr, size_t size, bool sync_to_device) {
539  if (ptr == m_data.get()) return;
540 
541  if (nullptr == ptr) {
542  m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data);
543  } else {
544  m_data = parsec_data_ptr(create_parsec_data(ptr, size, sync_to_device), &delete_parsec_data);
545  }
546  }
547 
548  inline
549  ttg_parsec_data_wrapper_t::ttg_parsec_data_wrapper_t()
550  : m_data(nullptr, delete_null_parsec_data)
551  , m_ttg_copy(detail::ttg_data_copy_container())
552  {
553  if (m_ttg_copy) {
555  }
556  }
557 
558  inline
560  : m_data(std::move(other.m_data))
561  , m_ttg_copy(detail::ttg_data_copy_container())
562  {
563  // try to remove the old buffer from the *old* ttg_copy
564  other.remove_from_owner();
565 
566  // register with the new ttg_copy
567  if (nullptr != m_ttg_copy) {
569  }
570  }
571 
572  inline
574  m_data = std::move(other.m_data);
575  /* remove from old ttg copy */
576  other.remove_from_owner();
577 
578  if (nullptr != m_ttg_copy) {
579  /* register with the new ttg_copy */
581  }
582  return *this;
583  }
584 
585 
586  inline
588  if (nullptr != m_ttg_copy) {
590  m_ttg_copy = nullptr;
591  }
592  }
593 
594 
595  } // namespace detail
596 
597 } // namespace ttg_parsec
598 
599 #endif // TTG_DATA_COPY_H
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
ttg_data_copy_t *& ttg_data_copy_container()
Definition: thread_local.h:14
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
int size(World world=default_execution_context())
Definition: run.h:89
#define TTG_PARSEC_DATA_FLAG_REGISTERED
Definition: parsec-ext.h:5
N.B. contains values of F_n and F_{n-1}.
Definition: fibonacci.cc:5
ttg_parsec_data_wrapper_t * m_single_dev_data
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
ttg_data_copy_t(const ttg_data_copy_t &c)
std::vector< ttg_parsec_data_wrapper_t * > m_dev_data
void remove_device_data(ttg_parsec_data_wrapper_t *data)
void transfer_ownership(int access, int device=0)
ttg_data_copy_t(ttg_data_copy_t &&c)
ttg_data_copy_t & operator=(const ttg_data_copy_t &c)
std::vector< ttg::iovec > m_iovecs
void iovec_add(const ttg::iovec &iov)
void add_device_data(ttg_parsec_data_wrapper_t *data)
ttg_data_copy_t & operator=(ttg_data_copy_t &&c)
typename std::vector< ttg::iovec >::iterator iovec_iterator
void set_next_task(parsec_task_t *task)
ttg_data_value_copy_t(ttg_data_value_copy_t &&c) noexcept(std::is_nothrow_move_constructible_v< value_type >)
requires(std::constructible_from< ValueT, T >) ttg_data_value_copy_t(T &&value)
ttg_data_value_copy_t & operator=(const ttg_data_value_copy_t &c) noexcept(std::is_nothrow_copy_assignable_v< value_type >)
ttg_data_value_copy_t(const ttg_data_value_copy_t &c) noexcept(std::is_nothrow_copy_constructible_v< value_type >)
virtual void * get_ptr() override final
ttg_data_value_copy_t & operator=(ttg_data_value_copy_t &&c) noexcept(std::is_nothrow_move_assignable_v< value_type >)
static void delete_null_parsec_data(parsec_data_t *)
Definition: ttg_data_copy.h:75
void set_owner(ttg_data_copy_t &new_copy)
static parsec_data_t * create_parsec_data(void *ptr, size_t size, bool sync_to_device)
Definition: ttg_data_copy.h:40
static void delete_parsec_data(parsec_data_t *data)
Definition: ttg_data_copy.h:58
void add_copy(int parsec_dev, void *ptr)
const parsec_data_t * parsec_data() const
Definition: ttg_data_copy.h:54
ttg_parsec_data_wrapper_t(const ttg_parsec_data_wrapper_t &other)=delete
std::unique_ptr< parsec_data_t, decltype(&parsec_data_destroy)> parsec_data_ptr
Definition: ttg_data_copy.h:33
ttg_parsec_data_wrapper_t & operator=(const ttg_parsec_data_wrapper_t &other)=delete
void reset_parsec_data(void *ptr, size_t size, bool sync_to_device)