buffer.h
Go to the documentation of this file.
1 #ifndef TTG_PARSEC_BUFFER_H
2 #define TTG_PARSEC_BUFFER_H
3 
4 #include <array>
5 #include <vector>
6 #include <parsec.h>
7 #include <parsec/data_internal.h>
8 #include <parsec/mca/device/device.h>
10 #include "ttg/parsec/parsec-ext.h"
11 #include "ttg/util/iovec.h"
12 #include "ttg/device/device.h"
13 #include "ttg/parsec/device.h"
14 
15 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
16 #include <cuda_runtime.h>
17 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
18 
19 namespace ttg_parsec {
20 
21 
22 namespace detail {
23  // fwd decl
24  template<typename T, typename A>
25  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db);
26 } // namespace detail
27 
39 template<typename T, typename Allocator>
41  , private Allocator {
42 
43  /* TODO: add overloads for T[]? */
44  using value_type = std::remove_all_extents_t<T>;
45  using pointer_type = std::add_pointer_t<value_type>;
46  using const_pointer_type = const std::remove_const_t<value_type>*;
47  using element_type = std::decay_t<T>;
48 
49  using allocator_traits = std::allocator_traits<Allocator>;
50  using allocator_type = typename allocator_traits::allocator_type;
51 
52  static_assert(std::is_trivially_copyable_v<element_type>,
53  "Only trivially copyable types are supported for devices.");
54 
55 private:
56  using delete_fn_t = std::function<void(element_type*)>;
57 
58  using host_data_ptr = std::add_pointer_t<element_type>;
59  host_data_ptr m_host_data = nullptr;
60  std::size_t m_count = 0;
61  bool m_owned= false;
62 
63  static void delete_non_owned(element_type *ptr) {
64  // nothing to be done, we don't own the memory
65  }
66 
67  friend parsec_data_t* detail::get_parsec_data<T, Allocator>(const ttg_parsec::Buffer<T, Allocator>&);
68 
69  allocator_type& get_allocator_reference() { return static_cast<allocator_type&>(*this); }
70 
71  element_type* allocate(std::size_t n) {
72  return allocator_traits::allocate(get_allocator_reference(), n);
73  }
74 
75  void deallocate() {
76  allocator_traits::deallocate(get_allocator_reference(), m_host_data, m_count);
77  }
78 
79 public:
80 
81  Buffer() : Buffer(nullptr, 0)
82  { }
83 
94  , allocator_type()
95  , m_host_data(allocate(n))
96  , m_count(n)
97  , m_owned(true)
98  {
99  //std::cout << "buffer " << this << " ctor count "
100  // << m_count << "(" << m_host_data << ") ttg_copy "
101  // << m_ttg_copy
102  // << " parsec_data " << m_data.get() << std::endl;
103  this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
104  }
105 
117  , allocator_type()
118  , m_host_data(const_cast<element_type*>(ptr))
119  , m_count(n)
120  , m_owned(false)
121  {
122  //std::cout << "buffer " << this << " ctor ptr " << ptr << "count "
123  // << m_count << "(" << m_host_data << ") ttg_copy "
124  // << m_ttg_copy
125  // << " parsec_data " << m_data.get() << std::endl;
126  this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
127  }
128 
129  virtual ~Buffer() {
130  if (m_owned) {
131  deallocate();
132  m_owned = false;
133  }
134  unpin(); // make sure the copies are not pinned
135  }
136 
137  /* allow moving device buffers */
139  : ttg_parsec_data_wrapper_t(std::move(db))
140  , allocator_type(std::move(db))
141  , m_host_data(db.m_host_data)
142  , m_count(db.m_count)
143  , m_owned(db.m_owned)
144  {
145  db.m_host_data = nullptr;
146  db.m_count = 0;
147  db.m_owned = false;
148  }
149 
150  /* explicitly disable copying of buffers
151  * TODO: should we allow this? What data to use?
152  */
153  Buffer(const Buffer& db) = delete;
154 
155  /* allow moving device buffers */
157  ttg_parsec_data_wrapper_t::operator=(std::move(db));
158  allocator_type::operator=(std::move(db));
159  std::swap(m_host_data, db.m_host_data);
160  std::swap(m_count, db.m_count);
161  std::swap(m_owned, db.m_owned);
162  //std::cout << "buffer " << this << " other " << &db << " mv op ttg_copy " << m_ttg_copy << std::endl;
163  //std::cout << "buffer::move-assign from " << &db << " ttg-copy " << db.m_ttg_copy
164  // << " to " << this << " ttg-copy " << m_ttg_copy
165  // << " parsec-data " << m_data.get()
166  // << std::endl;
167  /* don't update the ttg_copy, we keep the connection */
168  return *this;
169  }
170 
171  /* explicitly disable copying of buffers
172  * TODO: should we allow this? What data to use?
173  */
174  Buffer& operator=(const Buffer& db) = delete;
175 
176  /* set the current device, useful when a device
177  * buffer was modified outside of a TTG */
179  assert(is_valid());
180  int parsec_id = detail::ttg_device_to_parsec_device(device);
181  /* make sure it's a valid device */
182  assert(parsec_nb_devices > parsec_id);
183  /* make sure it's a valid copy */
184  assert(m_data->device_copies[parsec_id] != nullptr);
185  m_data->owner_device = parsec_id;
186  }
187 
188  /* Get the owner device ID, i.e., the last updated
189  * device buffer. */
191  assert(is_valid());
192  return detail::parsec_device_to_ttg_device(m_data->owner_device);
193  }
194 
195  /* Get the pointer on the currently active device. */
197  assert(is_valid());
199  return static_cast<pointer_type>(m_data->device_copies[device_id]->device_private);
200  }
201 
202  /* Get the pointer on the currently active device. */
204  assert(is_valid());
206  return static_cast<const_pointer_type>(m_data->device_copies[device_id]->device_private);
207  }
208 
209  /* Get the pointer on the owning device.
210  * @note: This may not be the device assigned to the currently executing task.
211  * See \ref ttg::device::current_device for that. */
213  assert(is_valid());
214  return static_cast<pointer_type>(m_data->device_copies[m_data->owner_device]->device_private);
215  }
216 
217  /* get the current device pointer */
219  assert(is_valid());
220  return static_cast<const_pointer_type>(m_data->device_copies[m_data->owner_device]->device_private);
221  }
222 
223  /* get the device pointer at the given device
224  */
226  assert(is_valid());
227  int device_id = detail::ttg_device_to_parsec_device(device);
228  return static_cast<pointer_type>(parsec_data_get_ptr(m_data.get(), device_id));
229  }
230 
231  /* get the device pointer at the given device
232  */
234  assert(is_valid());
235  int device_id = detail::ttg_device_to_parsec_device(device);
236  return static_cast<const_pointer_type>(parsec_data_get_ptr(m_data.get(), device_id));
237  }
238 
240  return static_cast<pointer_type>(parsec_data_get_ptr(m_data.get(), 0));
241  }
242 
244  return static_cast<const_pointer_type>(parsec_data_get_ptr(m_data.get(), 0));
245  }
246 
247  bool is_valid_on(const ttg::device::Device& device) const {
248  assert(is_valid());
249  int device_id = detail::ttg_device_to_parsec_device(device);
250  return (parsec_data_get_ptr(m_data.get(), device_id) != nullptr);
251  }
252 
253  void allocate_on(const ttg::device::Device& device_id) {
254  /* TODO: need exposed PaRSEC memory allocator */
255  throw std::runtime_error("not implemented yet");
256  }
257 
258  /* TODO: can we do this automatically?
259  * Pin the memory on all devices we currently track.
260  * Pinned memory won't be released by PaRSEC and can be used
261  * at any time.
262  */
263  void pin() {
264  for (int i = 1; i < parsec_nb_devices; ++i) {
265  pin_on(i);
266  }
267  }
268 
269  /* Unpin the memory on all devices we currently track. */
270  void unpin() {
271  if (!is_valid()) return;
272  for (int i = 0; i < parsec_nb_devices-detail::first_device_id; ++i) {
273  unpin_on(i);
274  }
275  }
276 
277  /* Pin the memory on a given device */
278  void pin_on(int device_id) {
279  /* TODO: how can we pin memory on a device? */
280  }
281 
282  /* Pin the memory on a given device */
283  void unpin_on(int device_id) {
284  /* TODO: how can we unpin memory on a device? */
285  }
286 
287  bool is_valid() const {
288  return !!m_data;
289  }
290 
291  operator bool() const {
292  return is_valid();
293  }
294 
295  std::size_t size() const {
296  return m_count;
297  }
298 
299  /* Reallocate the buffer with count elements */
300  void reset(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) {
301  /* TODO: can we resize if count is smaller than m_count? */
302 
303  if (m_owned) {
304  deallocate();
305  m_owned = false;
306  }
307 
308  if (n == 0) {
309  m_host_data = nullptr;
310  m_owned = false;
311  } else {
312  m_host_data = allocate(n);
313  m_owned = true;
314  }
315  reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
316  //std::cout << "buffer::reset(" << count << ") ptr " << m_host_data.get()
317  // << " ttg_copy " << m_ttg_copy
318  // << " parsec_data " << m_data.get() << std::endl;
319  m_count = n;
320  }
321 
322  /* Reset the buffer to use the ptr to count elements */
323  void reset(pointer_type ptr, std::size_t n = 1, ttg::scope scope = ttg::scope::SyncIn) {
324  /* TODO: can we resize if count is smaller than m_count? */
325  if (n == m_count) {
326  return;
327  }
328 
329  if (m_owned) {
330  deallocate();
331  }
332 
333  if (nullptr == ptr) {
334  m_host_data = nullptr;
335  m_count = 0;
336  m_owned = false;
337  } else {
338  m_host_data = ptr;
339  m_count = n;
340  m_owned = false;
341  }
342  reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
343  //std::cout << "buffer::reset(" << ptr << ", " << count << ") ptr " << m_host_data.get()
344  // << " ttg_copy " << m_ttg_copy
345  // << " parsec_data " << m_data.get() << std::endl;
346  }
347 
356  if (scope == ttg::scope::Allocate) {
357  m_data->device_copies[0]->version = 0;
358  } else {
359  m_data->device_copies[0]->version = 1;
360  /* reset all other copies to force a sync-in */
361  for (int i = 0; i < parsec_nb_devices; ++i) {
362  if (m_data->device_copies[i] != nullptr) {
363  m_data->device_copies[i]->version = 0;
364  }
365  }
366  m_data->owner_device = 0;
367  }
368  }
369 
371  /* only set device if the host has the latest copy as otherwise we might end up with a stale copy */
372  if (dev.is_device() && this->parsec_data()->owner_device == 0) {
373  parsec_advise_data_on_device(this->parsec_data(), detail::ttg_device_to_parsec_device(dev),
374  PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
375  }
376  }
377 
378  void add_device(ttg::device::Device dev, pointer_type ptr, bool is_current = false) {
379  if (is_valid_on(dev)) {
380  throw std::runtime_error("Unable to add device that has already a buffer set!");
381  }
383  if (is_current) {
384  // mark the data as being current on the new device
385  parsec_data()->owner_device = detail::ttg_device_to_parsec_device(dev);
386  }
387  }
388 
389  /* serialization support */
390 
391 #ifdef TTG_SERIALIZATION_SUPPORTS_BOOST
392  template <typename Archive>
393  void serialize(Archive& ar, const unsigned int version) {
394  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
395  std::size_t s = size();
396  ar& s;
397  assert(m_ttg_copy != nullptr); // only tracked objects allowed
399  } else {
400  std::size_t s;
401  ar & s;
402  /* initialize internal pointers and then reset */
403  reset(s);
404  assert(m_ttg_copy != nullptr); // only tracked objects allowed
406  }
407  }
408 #endif // TTG_SERIALIZATION_SUPPORTS_BOOST
409 
410 #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS
411  template <typename Archive>
412  std::enable_if_t<std::is_base_of_v<madness::archive::BufferInputArchive, Archive> ||
413  std::is_base_of_v<madness::archive::BufferOutputArchive, Archive>>
414  serialize(Archive& ar) {
415  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
416  std::size_t s = size();
417  ar& s;
418  assert(m_ttg_copy != nullptr); // only tracked objects allowed
419  /* transfer from the current device
420  * note: if the transport layer (MPI) does not support device transfers
421  * the data will have been pushed out */
423  } else {
424  std::size_t s;
425  ar & s;
426  //std::cout << "serialize(IN) buffer " << this << " size " << s << std::endl;
427  /* initialize internal pointers and then reset */
428  reset(s);
429  assert(m_ttg_copy != nullptr); // only tracked objects allowed
430  /* transfer to the current device
431  * TODO: how can we make sure the device copy is not evicted? */
433  }
434  }
435 #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS
436 
437 
438 };
439 
440 namespace detail {
441  template<typename T, typename A>
442  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db) {
443  return const_cast<parsec_data_t*>(db.m_data.get());
444  }
445 } // namespace detail
446 
447 } // namespace ttg_parsec
448 
449 #endif // TTG_PARSEC_BUFFER_H
Represents a device in a specific execution space.
Definition: device.h:14
bool is_device() const
Definition: device.h:43
Device current_device()
Definition: device.h:131
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
int first_device_id
Definition: device.h:12
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
parsec_data_t * get_parsec_data(const ttg_parsec::Buffer< T, A > &db)
Definition: buffer.h:442
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
scope
Definition: devicescope.h:5
std::array< int, 3 > version()
Definition: version.cc:4
std::decay_t< T > element_type
Definition: buffer.h:47
Buffer(Buffer &&db)
Definition: buffer.h:138
std::add_pointer_t< value_type > pointer_type
Definition: buffer.h:45
const_pointer_type device_ptr_on(const ttg::device::Device &device) const
Definition: buffer.h:233
void prefer_device(ttg::device::Device dev)
Definition: buffer.h:370
void set_current_device(const ttg::device::Device &device)
Definition: buffer.h:178
std::size_t size() const
Definition: buffer.h:295
const_pointer_type owner_device_ptr() const
Definition: buffer.h:218
bool is_valid() const
Definition: buffer.h:287
virtual ~Buffer()
Definition: buffer.h:129
bool is_valid_on(const ttg::device::Device &device) const
Definition: buffer.h:247
pointer_type host_ptr()
Definition: buffer.h:239
Buffer(const Buffer &db)=delete
ttg::device::Device get_owner_device() const
Definition: buffer.h:190
Buffer(pointer_type ptr, std::size_t n=1, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:115
void reset_scope(ttg::scope scope)
Definition: buffer.h:355
pointer_type device_ptr_on(const ttg::device::Device &device)
Definition: buffer.h:225
Buffer & operator=(const Buffer &db)=delete
const_pointer_type host_ptr() const
Definition: buffer.h:243
const_pointer_type current_device_ptr() const
Definition: buffer.h:203
std::allocator_traits< Allocator > allocator_traits
Definition: buffer.h:49
void add_device(ttg::device::Device dev, pointer_type ptr, bool is_current=false)
Definition: buffer.h:378
void pin_on(int device_id)
Definition: buffer.h:278
std::remove_all_extents_t< T > value_type
Definition: buffer.h:44
void allocate_on(const ttg::device::Device &device_id)
Definition: buffer.h:253
typename allocator_traits::allocator_type allocator_type
Definition: buffer.h:50
pointer_type owner_device_ptr()
Definition: buffer.h:212
const std::remove_const_t< value_type > * const_pointer_type
Definition: buffer.h:46
Buffer(std::size_t n, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:92
pointer_type current_device_ptr()
Definition: buffer.h:196
void reset(std::size_t n, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:300
void unpin_on(int device_id)
Definition: buffer.h:283
Buffer & operator=(Buffer &&db)
Definition: buffer.h:156
void reset(pointer_type ptr, std::size_t n=1, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:323
void iovec_add(const ttg::iovec &iov)
void add_copy(int parsec_dev, void *ptr)
void reset_parsec_data(void *ptr, size_t size, bool sync_to_device)