ttg_data_copy.h
Go to the documentation of this file.
1 #ifndef TTG_DATA_COPY_H
2 #define TTG_DATA_COPY_H
3 
4 #include <utility>
5 #include <limits>
6 #include <vector>
7 #include <iterator>
8 #include <atomic>
9 #include <type_traits>
10 
11 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
12 #include <cuda_runtime.h>
13 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
14 
15 #include <parsec.h>
16 
18 #include "ttg/parsec/parsec-ext.h"
19 #include "ttg/util/span.h"
20 
21 
22 namespace ttg_parsec {
23 
24  namespace detail {
25 
26  // fwd-decl
27  struct ttg_data_copy_t;
28 
29  /* Wrapper managing the relationship between a ttg data copy and the parsec_data_t object */
31 
32  protected:
33  using parsec_data_ptr = std::unique_ptr<parsec_data_t, decltype(&parsec_data_destroy)>;
34 
37 
39 
40  static parsec_data_t* create_parsec_data(void *ptr, size_t size) {
41  parsec_data_t *data = parsec_data_create_with_type(nullptr, 0, ptr, size,
42  parsec_datatype_int8_t);
43  data->device_copies[0]->flags |= PARSEC_DATA_FLAG_PARSEC_MANAGED;
44  data->device_copies[0]->coherency_state = PARSEC_DATA_COHERENCY_SHARED;
45  data->device_copies[0]->version = 1;
46  return data;
47  }
48 
49  parsec_data_t* parsec_data() {
50  return m_data.get();
51  }
52 
53  const parsec_data_t* parsec_data() const {
54  return m_data.get();
55  }
56 
57  static void delete_parsec_data(parsec_data_t *data) {
58 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
59  if (data->device_copies[0]->flags & TTG_PARSEC_DATA_FLAG_REGISTERED) {
60  // register the memory for faster access
61  cudaError_t status;
62  status = cudaHostUnregister(data->device_copies[0]->device_private);
63  assert(cudaSuccess == status);
64  data->device_copies[0]->flags ^= TTG_PARSEC_DATA_FLAG_REGISTERED;
65  }
66 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
67  //std::fprintf(stderr, "parsec_data_destroy %p device_copy[0] %p\n", data, data->device_copies[0]);
68  //parsec_data_destroy(data);
69  assert(data->device_copies[0] != nullptr);
70  auto copy = data->device_copies[0];
71  parsec_data_copy_detach(data, data->device_copies[0], 0);
72  PARSEC_OBJ_RELEASE(copy);
73  PARSEC_OBJ_RELEASE(data);
74 
75  }
76 
77  static void delete_null_parsec_data(parsec_data_t *) {
78  // nothing to be done, only used for nullptr
79  }
80 
81  protected:
82 
83  /* remove the the data from the owning data copy */
84  void remove_from_owner();
85 
86  /* add the data to the owning data copy */
87  void reset_parsec_data(void *ptr, size_t size);
88 
90 
92 
94 
96 
98 
100 
101  /* set a new owning data copy object */
102  void set_owner(ttg_data_copy_t& new_copy) {
103  m_ttg_copy = &new_copy;
104  }
105  };
106 
107 
108  /* templated to break cyclic dependency with ttg_data_copy_container */
109  template<typename T = ttg_data_copy_t>
112  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
114  }
115  };
116 
117  /* special type: stores a pointer to the ttg_data_copy_t. This is necessary
118  * because ttg_data_copy_t has virtual functions so we cannot cast from parsec_data_copy_t
119  * to ttg_data_copy_t (offsetof is not supported for virtual classes).
120  * The self pointer is a back-pointer to the ttg_data_copy_t. */
122  parsec_list_item_t super;
125  : self(dc)
126  {
127  PARSEC_OBJ_CONSTRUCT(&super, parsec_list_item_t);
128  }
129  };
130 
131  /* Non-owning copy-tracking wrapper, accounting for N readers or 1 writer.
132  * Also counts external references, which are not treated as
133  * readers or writers but merely prevent the object from being
134  * destroyed once no readers/writers exist.
135  */
137 
138  /* special value assigned to parsec_data_copy_t::readers to mark the copy as
139  * mutable, i.e., a task will modify it */
140  static constexpr int mutable_tag = std::numeric_limits<int>::min();
141 
143  : ttg_data_copy_self_t(this)
144  { }
145 
147  : ttg_data_copy_self_t(this)
148  {
149  /* we allow copying but do not copy any data over from the original
150  * device copies will have to be allocated again
151  * and it's a new object to reference */
152  }
153 
155  : ttg_data_copy_self_t(this)
157  , m_readers(c.m_readers)
158  , m_refs(c.m_refs.load(std::memory_order_relaxed))
159  , m_dev_data(std::move(c.m_dev_data))
162  {
163  c.m_num_dev_data = 0;
164  c.m_readers = 0;
165  c.m_single_dev_data = nullptr;
166 
168  data->set_owner(*this);
169  });
170  }
171 
173  {
174  m_next_task = c.m_next_task;
175  c.m_next_task = nullptr;
176  m_readers = c.m_readers;
177  c.m_readers = 0;
178  m_refs.store(c.m_refs.load(std::memory_order_relaxed), std::memory_order_relaxed);
179  c.m_refs.store(0, std::memory_order_relaxed);
180  m_dev_data = std::move(c.m_dev_data);
181  m_single_dev_data = c.m_single_dev_data;
182  c.m_single_dev_data = nullptr;
183  m_num_dev_data = c.m_num_dev_data;
184  c.m_num_dev_data = 0;
185 
186  /* move all data to the new owner */
188  data->set_owner(*this);
189  });
190  return *this;
191  }
192 
194  /* we allow copying but do not copy any data over from the original
195  * device copies will have to be allocated again
196  * and it's a new object to reference */
197 
198  return *this;
199  }
200 
201  /* mark destructor as virtual */
202  virtual ~ttg_data_copy_t() = default;
203 
204  /* Returns true if the copy is mutable */
205  bool is_mutable() const {
206  return m_readers == mutable_tag;
207  }
208 
209  /* Mark the copy as mutable */
210  void mark_mutable() {
212  }
213 
214  /* Increment the reader counter and return previous value
215  * \tparam Atomic Whether to decrement atomically. Default: true
216  */
217  template<bool Atomic = true>
219  if constexpr(Atomic) {
220  return parsec_atomic_fetch_inc_int32(&m_readers);
221 // std::atomic_ref<int32_t> a{m_readers};
222 // return a.fetch_add(1, std::memory_order_relaxed);
223  } else {
224  return m_readers++;
225  }
226  }
227 
231  void reset_readers() {
232  if (mutable_tag == m_readers) {
233  m_readers = 1;
234  }
235  }
236 
237  /* Decrement the reader counter and return previous value.
238  * \tparam Atomic Whether to decrement atomically. Default: true
239  */
240  template<bool Atomic = true>
242  if constexpr(Atomic) {
243  return parsec_atomic_fetch_dec_int32(&m_readers);
244 // std::atomic_ref<int32_t> a{m_readers};
245 // return a.fetch_sub(1, std::memory_order_relaxed);
246  } else {
247  return m_readers--;
248  }
249  }
250 
251  /* Returns the number of readers if the copy is immutable, or \c mutable_tag
252  * if the copy is mutable */
253  int num_readers() const {
254  return m_readers;
255  }
256 
257  /* Returns the pointer to the user data wrapped by the the copy object */
258  virtual void* get_ptr() = 0;
259 
260  parsec_task_t* get_next_task() const {
261  return m_next_task;
262  }
263 
264  void set_next_task(parsec_task_t* task) {
265  m_next_task = task;
266  }
267 
268  int32_t add_ref() {
269  return m_refs.fetch_add(1, std::memory_order_relaxed);
270  }
271 
272  int32_t drop_ref() {
273  return m_refs.fetch_sub(1, std::memory_order_relaxed);
274  }
275 
276  bool has_ref() {
277  return (m_refs.load(std::memory_order_relaxed) != 0);
278  }
279 
280  int32_t num_ref() const {
281  return m_refs.load(std::memory_order_relaxed);
282  }
283 
284  /* increment the version of the current copy */
286  //std::cout << "data-copy " << this << " inc_current_version " << " count " << m_num_dev_data << std::endl;
287  foreach_parsec_data([](parsec_data_t* data){
288  assert(data->device_copies[0] != nullptr);
289  data->device_copies[0]->version++;
290  });
291  }
292 
293  void transfer_ownership(int access, int device = 0) {
294  foreach_parsec_data([&](parsec_data_t* data){
295  parsec_data_transfer_ownership_to_copy(data, device, access);
296  });
297  }
298 
299  /* manage device copies owned by this object
300  * we only touch the vector if we have more than one copies to track
301  * and otherwise use the single-element member.
302  */
304 
306  switch (m_num_dev_data) {
307  case 0:
309  break;
310  case 1:
311  /* move single copy into vector and add new copy below */
312  m_dev_data.push_back(m_single_dev_data);
313  m_single_dev_data = nullptr;
314  /* fall-through */
315  default:
316  /* store in multi-copy vector */
317  m_dev_data.push_back(data);
318  break;
319  }
320  //std::cout << "data-copy " << this << " add data " << data << " count " << m_num_dev_data << std::endl;
321  m_num_dev_data++;
322  }
323 
325  //std::cout << "data-copy " << this << " remove data " << data << " count " << m_num_dev_data << std::endl;
326  if (m_num_dev_data == 0) {
327  /* this may happen if we're integrated into the object and have been moved */
328  return;
329  }
330  if (m_num_dev_data == 1) {
331  assert(m_single_dev_data == data);
332  m_single_dev_data = nullptr;
333  } else if (m_num_dev_data > 1) {
334  auto it = std::find(m_dev_data.begin(), m_dev_data.end(), data);
335  if (it != m_dev_data.end()) {
336  m_dev_data.erase(it);
337  }
338  }
339  --m_num_dev_data;
340  /* make single-entry if needed */
341  if (m_num_dev_data == 1) {
343  m_dev_data.clear();
344  }
345  }
346 
347  int num_dev_data() const {
348  return m_num_dev_data;
349  }
350 
351  template<typename Fn>
352  void foreach_wrapper(Fn&& fn) {
353  if (m_num_dev_data == 1) {
354  fn(m_single_dev_data);
355  } else if (m_num_dev_data > 1) {
356  std::for_each(m_dev_data.begin(), m_dev_data.end(), fn);
357  }
358  }
359 
360  template<typename Fn>
361  void foreach_parsec_data(Fn&& fn) {
362  if (m_num_dev_data == 1) {
365  }
366  } else if (m_num_dev_data > 1) {
367  std::for_each(m_dev_data.begin(), m_dev_data.end(),
369  if (data->parsec_data()) {
370  fn(data->parsec_data());
371  }
372  }
373  );
374  }
375  }
376 
377 
378 #if 0
379  iterator begin() {
380  switch(m_num_dev_data) {
381  // no device copies
382  case 0: return end();
383  case 1: return &m_single_dev_data;
384  default: return m_dev_data.data();
385  }
386  }
387 
388  iterator end() {
389  switch(m_num_dev_data) {
390  case 0:
391  case 1:
392  return &(m_single_dev_data) + 1;
393  default:
394  return m_dev_data.data() + m_dev_data.size();
395  }
396  }
397 #endif // 0
398 
399  using iovec_iterator = typename std::vector<ttg::iovec>::iterator;
400 
402  return m_iovecs.begin();
403  }
404 
406  return m_iovecs.end();
407  }
408 
409  void iovec_reset() {
410  m_iovecs.clear();
411  }
412 
413  void iovec_add(const ttg::iovec& iov) {
414  m_iovecs.push_back(iov);
415  }
416 
417  ttg::span<ttg::iovec> iovec_span() {
418  return ttg::span<ttg::iovec>(m_iovecs.data(), m_iovecs.size());
419  }
420 
421  std::size_t iovec_count() const {
422  return m_iovecs.size();
423  }
424 
425 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
426  int64_t size;
427  int64_t uid;
428 #endif
429  protected:
430  parsec_task_t *m_next_task = nullptr;
431  int32_t m_readers = 1;
432  std::atomic<int32_t> m_refs = 1; //< number of entities referencing this copy (TTGs, external)
433 
434  std::vector<ttg::iovec> m_iovecs;
435 
436  std::vector<ttg_parsec_data_wrapper_t*> m_dev_data; //< used if there are multiple device copies
437  // that belong to this object
438  ttg_parsec_data_wrapper_t *m_single_dev_data; //< used if there is a single device copy
439  int m_num_dev_data = 0; //< number of device copies
440  };
441 
442 
448  template<typename ValueT>
449  struct ttg_data_value_copy_t final : private ttg_data_copy_container_setter<ttg_data_copy_t>
450  , public ttg_data_copy_t {
451  using value_type = ValueT;
453 
454  template<typename T>
457  , ttg_data_copy_t()
458  , m_value(std::forward<T>(value))
459  {
460  /* reset the container tracker */
461  ttg_data_copy_container() = nullptr;
462  }
463 
465  noexcept(std::is_nothrow_move_constructible_v<value_type>)
467  , ttg_data_copy_t(std::move(c))
468  , m_value(std::move(c.m_value))
469  {
470  /* reset the container tracker */
471  ttg_data_copy_container() = nullptr;
472  }
473 
475  noexcept(std::is_nothrow_copy_constructible_v<value_type>)
477  , ttg_data_copy_t(c)
478  , m_value(c.m_value)
479  {
480  /* reset the container tracker */
481  ttg_data_copy_container() = nullptr;
482  }
483 
485  noexcept(std::is_nothrow_move_assignable_v<value_type>)
486  {
487  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
488  ttg_data_copy_container() = this;
489  ttg_data_copy_t::operator=(std::move(c));
490  m_value = std::move(c.m_value);
491  /* reset the container tracker */
492  ttg_data_copy_container() = nullptr;
493  }
494 
496  noexcept(std::is_nothrow_copy_assignable_v<value_type>)
497  {
498  /* set the container ptr here, will be reset in the the ttg_data_value_copy_t ctor */
499  ttg_data_copy_container() = this;
500  ttg_data_copy_t::operator=(c);
501  m_value = c.m_value;
502  /* reset the container tracker */
503  ttg_data_copy_container() = nullptr;
504  }
505 
507  return m_value;
508  }
509 
510  /* will destruct the value */
511  virtual ~ttg_data_value_copy_t() = default;
512 
513  virtual void* get_ptr() override final {
514  return &m_value;
515  }
516  };
517 
522  inline
523  void ttg_parsec_data_wrapper_t::remove_from_owner() {
524  if (nullptr != m_ttg_copy) {
525  m_ttg_copy->remove_device_data(this);
526  m_ttg_copy = nullptr;
527  }
528  }
529 
530  inline
531  void ttg_parsec_data_wrapper_t::reset_parsec_data(void *ptr, size_t size) {
532  if (ptr == m_data.get()) return;
533 
534  if (nullptr == ptr) {
535  m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data);
536  } else {
537  m_data = parsec_data_ptr(create_parsec_data(ptr, size), &delete_parsec_data);
538  }
539  }
540 
541  inline
542  ttg_parsec_data_wrapper_t::ttg_parsec_data_wrapper_t()
543  : m_data(nullptr, delete_null_parsec_data)
544  , m_ttg_copy(detail::ttg_data_copy_container())
545  {
546  if (m_ttg_copy) {
548  }
549  }
550 
551  inline
553  : m_data(std::move(other.m_data))
554  , m_ttg_copy(detail::ttg_data_copy_container())
555  {
556  /* the ttg_data_copy may have moved us already */
557  //if (other.m_ttg_copy != m_ttg_copy) {
558  // try to remove the old buffer from the *old* ttg_copy
559  other.remove_from_owner();
560 
561  // register with the new ttg_copy
562  if (nullptr != m_ttg_copy) {
564  }
565  //} else {
566  // other.m_ttg_copy = nullptr;
567  //}
568  }
569 
570  inline
572  m_data = std::move(other.m_data);
573  /* check whether the owning ttg_data_copy has already moved us */
574  if (other.m_ttg_copy != m_ttg_copy) {
575  /* remove from old ttg copy */
576  other.remove_from_owner();
577 
578  if (nullptr != m_ttg_copy) {
579  /* register with the new ttg_copy */
581  }
582  }
583  return *this;
584  }
585 
586 
587  inline
589  if (nullptr != m_ttg_copy) {
591  m_ttg_copy = nullptr;
592  }
593  }
594 
595 
596  } // namespace detail
597 
598 } // namespace ttg_parsec
599 
600 #endif // TTG_DATA_COPY_H
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
ttg_data_copy_t *& ttg_data_copy_container()
Definition: thread_local.h:14
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
int size(World world=default_execution_context())
Definition: run.h:89
#define TTG_PARSEC_DATA_FLAG_REGISTERED
Definition: parsec-ext.h:5
N.B. contains values of F_n and F_{n-1}.
Definition: fibonacci.cc:5
ttg_parsec_data_wrapper_t * m_single_dev_data
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
ttg_data_copy_t(const ttg_data_copy_t &c)
std::vector< ttg_parsec_data_wrapper_t * > m_dev_data
void remove_device_data(ttg_parsec_data_wrapper_t *data)
void transfer_ownership(int access, int device=0)
ttg_data_copy_t(ttg_data_copy_t &&c)
ttg_data_copy_t & operator=(const ttg_data_copy_t &c)
std::vector< ttg::iovec > m_iovecs
void iovec_add(const ttg::iovec &iov)
void add_device_data(ttg_parsec_data_wrapper_t *data)
ttg_data_copy_t & operator=(ttg_data_copy_t &&c)
typename std::vector< ttg::iovec >::iterator iovec_iterator
void set_next_task(parsec_task_t *task)
ttg_data_value_copy_t(ttg_data_value_copy_t &&c) noexcept(std::is_nothrow_move_constructible_v< value_type >)
ttg_data_value_copy_t & operator=(const ttg_data_value_copy_t &c) noexcept(std::is_nothrow_copy_assignable_v< value_type >)
ttg_data_value_copy_t(const ttg_data_value_copy_t &c) noexcept(std::is_nothrow_copy_constructible_v< value_type >)
virtual void * get_ptr() override final
ttg_data_value_copy_t & operator=(ttg_data_value_copy_t &&c) noexcept(std::is_nothrow_move_assignable_v< value_type >)
static parsec_data_t * create_parsec_data(void *ptr, size_t size)
Definition: ttg_data_copy.h:40
static void delete_null_parsec_data(parsec_data_t *)
Definition: ttg_data_copy.h:77
void set_owner(ttg_data_copy_t &new_copy)
static void delete_parsec_data(parsec_data_t *data)
Definition: ttg_data_copy.h:57
const parsec_data_t * parsec_data() const
Definition: ttg_data_copy.h:53
void reset_parsec_data(void *ptr, size_t size)
ttg_parsec_data_wrapper_t(const ttg_parsec_data_wrapper_t &other)=delete
std::unique_ptr< parsec_data_t, decltype(&parsec_data_destroy)> parsec_data_ptr
Definition: ttg_data_copy.h:33
ttg_parsec_data_wrapper_t & operator=(const ttg_parsec_data_wrapper_t &other)=delete