buffer.h
Go to the documentation of this file.
1 #ifndef TTG_PARSEC_BUFFER_H
2 #define TTG_PARSEC_BUFFER_H
3 
4 #include <array>
5 #include <vector>
6 #include <parsec.h>
7 #include <parsec/data_internal.h>
8 #include <parsec/mca/device/device.h>
10 #include "ttg/parsec/parsec-ext.h"
11 #include "ttg/util/iovec.h"
12 #include "ttg/device/device.h"
13 #include "ttg/parsec/device.h"
14 #include "ttg/devicescope.h"
15 
16 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
17 #include <cuda_runtime.h>
18 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
19 
20 namespace ttg_parsec {
21 
22 
23 namespace detail {
24  // fwd decl
25  template<typename T, typename A>
26  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db);
27 } // namespace detail
28 
40 template<typename T, typename Allocator>
42  , private Allocator {
43 
44  /* TODO: add overloads for T[]? */
45  using value_type = std::remove_all_extents_t<T>;
46  using pointer_type = std::add_pointer_t<value_type>;
47  using const_pointer_type = const std::remove_const_t<value_type>*;
48  using element_type = std::decay_t<T>;
49 
50  using allocator_traits = std::allocator_traits<Allocator>;
51  using allocator_type = typename allocator_traits::allocator_type;
52 
53  static_assert(std::is_trivially_copyable_v<element_type>,
54  "Only trivially copyable types are supported for devices.");
55 
56 private:
57  using delete_fn_t = std::function<void(element_type*)>;
58 
59  using host_data_ptr = std::add_pointer_t<element_type>;
60  host_data_ptr m_host_data = nullptr;
61  std::size_t m_count = 0;
62  bool m_owned= false;
63 
64  static void delete_non_owned(element_type *ptr) {
65  // nothing to be done, we don't own the memory
66  }
67 
68  friend parsec_data_t* detail::get_parsec_data<T, Allocator>(const ttg_parsec::Buffer<T, Allocator>&);
69 
70  allocator_type& get_allocator_reference() { return static_cast<allocator_type&>(*this); }
71 
72  element_type* allocate(std::size_t n) {
73  return allocator_traits::allocate(get_allocator_reference(), n);
74  }
75 
76  void deallocate() {
77  allocator_traits::deallocate(get_allocator_reference(), m_host_data, m_count);
78  }
79 
80 public:
81 
84  , allocator_type()
85  , m_host_data(nullptr)
86  , m_count(0)
87  , m_owned(false)
88  { }
89 
100  , allocator_type()
101  , m_host_data(allocate(n))
102  , m_count(n)
103  , m_owned(true)
104  {
105  //std::cout << "buffer " << this << " ctor count "
106  // << m_count << "(" << m_host_data << ") ttg_copy "
107  // << m_ttg_copy
108  // << " parsec_data " << m_data.get() << std::endl;
109  this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
110  }
111 
123  , allocator_type()
124  , m_host_data(const_cast<element_type*>(ptr))
125  , m_count(n)
126  , m_owned(false)
127  {
128  //std::cout << "buffer " << this << " ctor ptr " << ptr << "count "
129  // << m_count << "(" << m_host_data << ") ttg_copy "
130  // << m_ttg_copy
131  // << " parsec_data " << m_data.get() << std::endl;
132  this->reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
133  }
134 
135  virtual ~Buffer() {
136  if (m_owned) {
137  deallocate();
138  m_owned = false;
139  }
140  unpin(); // make sure the copies are not pinned
141  }
142 
143  /* allow moving device buffers */
145  : ttg_parsec_data_wrapper_t(std::move(db))
146  , allocator_type(std::move(db))
147  , m_host_data(db.m_host_data)
148  , m_count(db.m_count)
149  , m_owned(db.m_owned)
150  {
151  db.m_host_data = nullptr;
152  db.m_count = 0;
153  db.m_owned = false;
154  }
155 
156  /* explicitly disable copying of buffers
157  * TODO: should we allow this? What data to use?
158  */
159  Buffer(const Buffer& db) = delete;
160 
161  /* allow moving device buffers */
163  ttg_parsec_data_wrapper_t::operator=(std::move(db));
164  allocator_type::operator=(std::move(db));
165  std::swap(m_host_data, db.m_host_data);
166  std::swap(m_count, db.m_count);
167  std::swap(m_owned, db.m_owned);
168  //std::cout << "buffer " << this << " other " << &db << " mv op ttg_copy " << m_ttg_copy << std::endl;
169  //std::cout << "buffer::move-assign from " << &db << " ttg-copy " << db.m_ttg_copy
170  // << " to " << this << " ttg-copy " << m_ttg_copy
171  // << " parsec-data " << m_data.get()
172  // << std::endl;
173  /* don't update the ttg_copy, we keep the connection */
174  return *this;
175  }
176 
177  /* explicitly disable copying of buffers
178  * TODO: should we allow this? What data to use?
179  */
180  Buffer& operator=(const Buffer& db) = delete;
181 
182  /* set the current device, useful when a device
183  * buffer was modified outside of a TTG */
185  assert(is_valid());
186  int parsec_id = detail::ttg_device_to_parsec_device(device);
187  /* make sure it's a valid device */
188  assert(parsec_nb_devices > parsec_id);
189  /* make sure it's a valid copy */
190  assert(m_data->device_copies[parsec_id] != nullptr);
191  m_data->owner_device = parsec_id;
192  }
193 
195  if (empty()) return true; // empty is current everywhere
196  int parsec_id = detail::ttg_device_to_parsec_device(dev);
197  uint32_t max_version = 0;
198  for (int i = 0; i < parsec_nb_devices; ++i) {
199  if (nullptr == m_data->device_copies[i]) continue;
200  max_version = std::max(max_version, m_data->device_copies[i]->version);
201  }
202  return (m_data->device_copies[parsec_id] &&
203  m_data->device_copies[parsec_id]->version == max_version);
204  }
205 
206  /* Get the owner device ID, i.e., the last updated
207  * device buffer.
208  * NOTE: there may be more than one device with the current
209  * data so the result may not always be what is expected.
210  * Use is_current_on() to check for a specific device. */
212  assert(is_valid());
213  if (empty()) return ttg::device::current_device(); // empty is always valid
214  return detail::parsec_device_to_ttg_device(m_data->owner_device);
215  }
216 
217  /* Get the pointer on the currently active device. */
219  assert(is_valid());
220  if (empty()) return nullptr;
222  return static_cast<pointer_type>(m_data->device_copies[device_id]->device_private);
223  }
224 
225  /* Get the pointer on the currently active device. */
227  assert(is_valid());
228  if (empty()) return nullptr;
230  return static_cast<const_pointer_type>(m_data->device_copies[device_id]->device_private);
231  }
232 
233  /* Get the pointer on the owning device.
234  * @note: This may not be the device assigned to the currently executing task.
235  * See \ref ttg::device::current_device for that. */
237  assert(is_valid());
238  if (empty()) return nullptr;
239  return static_cast<pointer_type>(m_data->device_copies[m_data->owner_device]->device_private);
240  }
241 
242  /* get the current device pointer */
244  assert(is_valid());
245  if (empty()) return nullptr;
246  return static_cast<const_pointer_type>(m_data->device_copies[m_data->owner_device]->device_private);
247  }
248 
249  /* get the device pointer at the given device
250  */
252  assert(is_valid());
253  if (empty()) return nullptr;
254  int device_id = detail::ttg_device_to_parsec_device(device);
255  return static_cast<pointer_type>(parsec_data_get_ptr(m_data.get(), device_id));
256  }
257 
258  /* get the device pointer at the given device
259  */
261  assert(is_valid());
262  if (empty()) return nullptr;
263  int device_id = detail::ttg_device_to_parsec_device(device);
264  return static_cast<const_pointer_type>(parsec_data_get_ptr(m_data.get(), device_id));
265  }
266 
268  if (empty()) return nullptr;
269  return static_cast<pointer_type>(parsec_data_get_ptr(m_data.get(), 0));
270  }
271 
273  if (empty()) return nullptr;
274  return static_cast<const_pointer_type>(parsec_data_get_ptr(m_data.get(), 0));
275  }
276 
277  bool is_valid_on(const ttg::device::Device& device) const {
278  assert(is_valid());
279  int device_id = detail::ttg_device_to_parsec_device(device);
280  return (parsec_data_get_ptr(m_data.get(), device_id) != nullptr);
281  }
282 
283  void allocate_on(const ttg::device::Device& device_id) {
284  /* TODO: need exposed PaRSEC memory allocator */
285  throw std::runtime_error("not implemented yet");
286  }
287 
288  /* TODO: can we do this automatically?
289  * Pin the memory on all devices we currently track.
290  * Pinned memory won't be released by PaRSEC and can be used
291  * at any time.
292  */
293  void pin() {
294  for (int i = 1; i < parsec_nb_devices; ++i) {
295  pin_on(i);
296  }
297  }
298 
299  /* Unpin the memory on all devices we currently track. */
300  void unpin() {
301  if (!is_valid()) return;
302  for (int i = 0; i < parsec_nb_devices-detail::first_device_id; ++i) {
303  unpin_on(i);
304  }
305  }
306 
307  /* Pin the memory on a given device */
308  void pin_on(int device_id) {
309  /* TODO: how can we pin memory on a device? */
310  }
311 
312  /* Pin the memory on a given device */
313  void unpin_on(int device_id) {
314  /* TODO: how can we unpin memory on a device? */
315  }
316 
317  bool is_valid() const {
318  return (m_count == 0 || m_data);
319  }
320 
321  operator bool() const {
322  return !empty();
323  }
324 
325  std::size_t size() const {
326  return m_count;
327  }
328 
329  bool empty() const {
330  return m_count == 0;
331  }
332 
333  /* Reallocate the buffer with count elements */
334  void reset(std::size_t n, ttg::scope scope = ttg::scope::SyncIn) {
335  /* TODO: can we resize if count is smaller than m_count? */
336 
337  if (m_owned) {
338  deallocate();
339  m_owned = false;
340  }
341 
342  if (n == 0) {
343  m_host_data = nullptr;
344  m_owned = false;
345  } else {
346  m_host_data = allocate(n);
347  m_owned = true;
348  }
349  reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
350  //std::cout << "buffer::reset(" << count << ") ptr " << m_host_data.get()
351  // << " ttg_copy " << m_ttg_copy
352  // << " parsec_data " << m_data.get() << std::endl;
353  m_count = n;
354  }
355 
356  /* Reset the buffer to use the ptr to count elements */
357  void reset(pointer_type ptr, std::size_t n = 1, ttg::scope scope = ttg::scope::SyncIn) {
358  /* TODO: can we resize if count is smaller than m_count? */
359  if (n == m_count) {
360  return;
361  }
362 
363  if (m_owned) {
364  deallocate();
365  }
366 
367  if (nullptr == ptr) {
368  m_host_data = nullptr;
369  m_count = 0;
370  m_owned = false;
371  } else {
372  m_host_data = ptr;
373  m_count = n;
374  m_owned = false;
375  }
376  reset_parsec_data(m_host_data, n*sizeof(element_type), (scope == ttg::scope::SyncIn));
377  //std::cout << "buffer::reset(" << ptr << ", " << count << ") ptr " << m_host_data.get()
378  // << " ttg_copy " << m_ttg_copy
379  // << " parsec_data " << m_data.get() << std::endl;
380  }
381 
390  if (scope == ttg::scope::Allocate) {
391  m_data->device_copies[0]->version = 0;
392  } else {
393  m_data->device_copies[0]->version = 1;
394  /* reset all other copies to force a sync-in */
395  for (int i = 0; i < parsec_nb_devices; ++i) {
396  if (m_data->device_copies[i] != nullptr) {
397  m_data->device_copies[i]->version = 0;
398  }
399  }
400  }
401  m_data->owner_device = 0;
402  }
403 
404  ttg::scope scope() const {
405  /* if the host owns the data and has a version of zero we only have to allocate data */
406  return (m_data->device_copies[0]->version == 0 && m_data->owner_device == 0)
408  }
409 
411  /* only set device if the host has the latest copy as otherwise we might end up with a stale copy */
412  if (dev.is_device() && this->parsec_data()->owner_device == 0) {
413  parsec_advise_data_on_device(this->parsec_data(), detail::ttg_device_to_parsec_device(dev),
414  PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
415  }
416  }
417 
418  void add_device(ttg::device::Device dev, pointer_type ptr, bool is_current = false) {
419  if (is_valid_on(dev)) {
420  throw std::runtime_error("Unable to add device that has already a buffer set!");
421  }
423  if (is_current) {
424  // mark the data as being current on the new device
425  parsec_data()->owner_device = detail::ttg_device_to_parsec_device(dev);
426  }
427  }
428 
429  /* serialization support */
430 
431 #ifdef TTG_SERIALIZATION_SUPPORTS_BOOST
432  template <typename Archive>
433  void serialize(Archive& ar, const unsigned int version) {
434  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
435  std::size_t s = size();
436  ar& s;
437  assert(m_ttg_copy != nullptr); // only tracked objects allowed
439  } else {
440  std::size_t s;
441  ar & s;
442  /* initialize internal pointers and then reset */
443  reset(s);
444  assert(m_ttg_copy != nullptr); // only tracked objects allowed
446  }
447  }
448 #endif // TTG_SERIALIZATION_SUPPORTS_BOOST
449 
450 #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS
451  template <typename Archive>
452  std::enable_if_t<std::is_base_of_v<madness::archive::BufferInputArchive, Archive> ||
453  std::is_base_of_v<madness::archive::BufferOutputArchive, Archive>>
454  serialize(Archive& ar) {
455  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
456  std::size_t s = size();
457  ar& s;
458  assert(m_ttg_copy != nullptr); // only tracked objects allowed
459  /* transfer from the current device
460  * note: if the transport layer (MPI) does not support device transfers
461  * the data will have been pushed out */
463  } else {
464  std::size_t s;
465  ar & s;
466  //std::cout << "serialize(IN) buffer " << this << " size " << s << std::endl;
467  /* initialize internal pointers and then reset */
468  reset(s);
469  assert(m_ttg_copy != nullptr); // only tracked objects allowed
470  /* transfer to the current device
471  * TODO: how can we make sure the device copy is not evicted? */
473  }
474  }
475 #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS
476 };
477 
478 namespace detail {
479  template<typename T, typename A>
480  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db) {
481  return const_cast<parsec_data_t*>(db.m_data.get());
482  }
483 } // namespace detail
484 
485 } // namespace ttg_parsec
486 
487 #endif // TTG_PARSEC_BUFFER_H
Represents a device in a specific execution space.
Definition: device.h:14
bool is_device() const
Definition: device.h:43
Device current_device()
Definition: device.h:135
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
int first_device_id
Definition: device.h:12
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
parsec_data_t * get_parsec_data(const ttg_parsec::Buffer< T, A > &db)
Definition: buffer.h:480
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
scope
Definition: devicescope.h:5
std::array< int, 3 > version()
Definition: version.cc:4
std::decay_t< T > element_type
Definition: buffer.h:48
Buffer(Buffer &&db)
Definition: buffer.h:144
std::add_pointer_t< value_type > pointer_type
Definition: buffer.h:46
const_pointer_type device_ptr_on(const ttg::device::Device &device) const
Definition: buffer.h:260
void prefer_device(ttg::device::Device dev)
Definition: buffer.h:410
void set_current_device(const ttg::device::Device &device)
Definition: buffer.h:184
std::size_t size() const
Definition: buffer.h:325
const_pointer_type owner_device_ptr() const
Definition: buffer.h:243
bool is_valid() const
Definition: buffer.h:317
virtual ~Buffer()
Definition: buffer.h:135
bool is_valid_on(const ttg::device::Device &device) const
Definition: buffer.h:277
pointer_type host_ptr()
Definition: buffer.h:267
Buffer(const Buffer &db)=delete
ttg::device::Device get_owner_device() const
Definition: buffer.h:211
Buffer(pointer_type ptr, std::size_t n=1, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:121
void reset_scope(ttg::scope scope)
Definition: buffer.h:389
pointer_type device_ptr_on(const ttg::device::Device &device)
Definition: buffer.h:251
Buffer & operator=(const Buffer &db)=delete
const_pointer_type host_ptr() const
Definition: buffer.h:272
const_pointer_type current_device_ptr() const
Definition: buffer.h:226
std::allocator_traits< Allocator > allocator_traits
Definition: buffer.h:50
void add_device(ttg::device::Device dev, pointer_type ptr, bool is_current=false)
Definition: buffer.h:418
void pin_on(int device_id)
Definition: buffer.h:308
bool is_current_on(ttg::device::Device dev) const
Definition: buffer.h:194
std::remove_all_extents_t< T > value_type
Definition: buffer.h:45
void allocate_on(const ttg::device::Device &device_id)
Definition: buffer.h:283
ttg::scope scope() const
Definition: buffer.h:404
typename allocator_traits::allocator_type allocator_type
Definition: buffer.h:51
bool empty() const
Definition: buffer.h:329
pointer_type owner_device_ptr()
Definition: buffer.h:236
const std::remove_const_t< value_type > * const_pointer_type
Definition: buffer.h:47
Buffer(std::size_t n, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:98
pointer_type current_device_ptr()
Definition: buffer.h:218
void reset(std::size_t n, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:334
void unpin_on(int device_id)
Definition: buffer.h:313
Buffer & operator=(Buffer &&db)
Definition: buffer.h:162
void reset(pointer_type ptr, std::size_t n=1, ttg::scope scope=ttg::scope::SyncIn)
Definition: buffer.h:357
void iovec_add(const ttg::iovec &iov)
void add_copy(int parsec_dev, void *ptr)
void reset_parsec_data(void *ptr, size_t size, bool sync_to_device)