buffer.h
Go to the documentation of this file.
1 #ifndef TTG_PARSEC_BUFFER_H
2 #define TTG_PARSEC_BUFFER_H
3 
4 #include <array>
5 #include <vector>
6 #include <parsec.h>
7 #include <parsec/data_internal.h>
8 #include <parsec/mca/device/device.h>
10 #include "ttg/parsec/parsec-ext.h"
11 #include "ttg/util/iovec.h"
12 #include "ttg/device/device.h"
13 #include "ttg/parsec/device.h"
14 
15 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
16 #include <cuda_runtime.h>
17 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
18 
19 namespace ttg_parsec {
20 
21 
22 namespace detail {
23  // fwd decl
24  template<typename T, typename A>
25  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db);
26 } // namespace detail
27 
39 template<typename T, typename Allocator>
41  , private Allocator {
42 
43  using element_type = std::decay_t<T>;
44 
45  using allocator_traits = std::allocator_traits<Allocator>;
46  using allocator_type = typename allocator_traits::allocator_type;
47 
48  static_assert(std::is_trivially_copyable_v<element_type>,
49  "Only trivially copyable types are supported for devices.");
50  static_assert(std::is_default_constructible_v<element_type>,
51  "Only default constructible types are supported for devices.");
52 
53 private:
54  using delete_fn_t = std::function<void(element_type*)>;
55 
56  using host_data_ptr = std::add_pointer_t<element_type>;
57  host_data_ptr m_host_data = nullptr;
58  std::size_t m_count = 0;
59  bool m_owned= false;
60 
61  static void delete_non_owned(element_type *ptr) {
62  // nothing to be done, we don't own the memory
63  }
64 
65  friend parsec_data_t* detail::get_parsec_data<T, Allocator>(const ttg_parsec::Buffer<T, Allocator>&);
66 
67  allocator_type& get_allocator_reference() { return static_cast<allocator_type&>(*this); }
68 
69  element_type* allocate(std::size_t n) {
70  return allocator_traits::allocate(get_allocator_reference(), n);
71  }
72 
73  void deallocate() {
74  allocator_traits::deallocate(get_allocator_reference(), m_host_data, m_count);
75  }
76 
77 public:
78 
79  Buffer() : Buffer(nullptr, 0)
80  { }
81 
82  Buffer(std::size_t n)
84  , allocator_type()
85  , m_host_data(allocate(n))
86  , m_count(n)
87  , m_owned(true)
88  {
89  //std::cout << "buffer " << this << " ctor count "
90  // << m_count << "(" << m_host_data << ") ttg_copy "
91  // << m_ttg_copy
92  // << " parsec_data " << m_data.get() << std::endl;
93  this->reset_parsec_data(m_host_data, n*sizeof(element_type));
94  }
95 
96  /* Constructing a buffer using application-managed memory.
97  * The memory pointed to by ptr must be accessible during
98  * the life-time of the buffer. */
99  Buffer(element_type* ptr, std::size_t n = 1)
101  , allocator_type()
102  , m_host_data(ptr)
103  , m_count(n)
104  , m_owned(false)
105  {
106  //std::cout << "buffer " << this << " ctor ptr " << ptr << "count "
107  // << m_count << "(" << m_host_data << ") ttg_copy "
108  // << m_ttg_copy
109  // << " parsec_data " << m_data.get() << std::endl;
110  this->reset_parsec_data(m_host_data, n*sizeof(element_type));
111  }
112 
113  virtual ~Buffer() {
114  if (m_owned) {
115  deallocate();
116  m_owned = false;
117  }
118  unpin(); // make sure the copies are not pinned
119  }
120 
121  /* allow moving device buffers */
123  : ttg_parsec_data_wrapper_t(std::move(db))
124  , allocator_type(std::move(db))
125  , m_host_data(db.m_host_data)
126  , m_count(db.m_count)
127  , m_owned(db.m_owned)
128  {
129  db.m_host_data = nullptr;
130  db.m_count = 0;
131  db.m_owned = false;
132  }
133 
134  /* explicitly disable copying of buffers
135  * TODO: should we allow this? What data to use?
136  */
137  Buffer(const Buffer& db) = delete;
138 
139  /* allow moving device buffers */
141  ttg_parsec_data_wrapper_t::operator=(std::move(db));
142  allocator_type::operator=(std::move(db));
143  std::swap(m_host_data, db.m_host_data);
144  std::swap(m_count, db.m_count);
145  std::swap(m_owned, db.m_owned);
146  //std::cout << "buffer " << this << " other " << &db << " mv op ttg_copy " << m_ttg_copy << std::endl;
147  //std::cout << "buffer::move-assign from " << &db << " ttg-copy " << db.m_ttg_copy
148  // << " to " << this << " ttg-copy " << m_ttg_copy
149  // << " parsec-data " << m_data.get()
150  // << std::endl;
151  /* don't update the ttg_copy, we keep the connection */
152  return *this;
153  }
154 
155  /* explicitly disable copying of buffers
156  * TODO: should we allow this? What data to use?
157  */
158  Buffer& operator=(const Buffer& db) = delete;
159 
160  /* set the current device, useful when a device
161  * buffer was modified outside of a TTG */
163  assert(is_valid());
164  int parsec_id = detail::ttg_device_to_parsec_device(device);
165  /* make sure it's a valid device */
166  assert(parsec_nb_devices > parsec_id);
167  /* make sure it's a valid copy */
168  assert(m_data->device_copies[parsec_id] != nullptr);
169  m_data->owner_device = parsec_id;
170  }
171 
172  /* Get the owner device ID, i.e., the last updated
173  * device buffer. */
175  assert(is_valid());
176  return detail::parsec_device_to_ttg_device(m_data->owner_device);
177  }
178 
179  /* Get the pointer on the currently active device. */
181  assert(is_valid());
183  return static_cast<element_type*>(m_data->device_copies[device_id]->device_private);
184  }
185 
186  /* Get the pointer on the currently active device. */
188  assert(is_valid());
190  return static_cast<element_type*>(m_data->device_copies[device_id]->device_private);
191  }
192 
193  /* Get the pointer on the owning device.
194  * @note: This may not be the device assigned to the currently executing task.
195  * See \ref ttg::device::current_device for that. */
197  assert(is_valid());
198  return static_cast<element_type*>(m_data->device_copies[m_data->owner_device]->device_private);
199  }
200 
201  /* get the current device pointer */
203  assert(is_valid());
204  return static_cast<element_type*>(m_data->device_copies[m_data->owner_device]->device_private);
205  }
206 
207  /* get the device pointer at the given device
208  */
210  assert(is_valid());
211  int device_id = detail::ttg_device_to_parsec_device(device);
212  return static_cast<element_type*>(parsec_data_get_ptr(m_data.get(), device_id));
213  }
214 
215  /* get the device pointer at the given device
216  */
217  const element_type* device_ptr_on(const ttg::device::Device& device) const {
218  assert(is_valid());
219  int device_id = detail::ttg_device_to_parsec_device(device);
220  return static_cast<element_type*>(parsec_data_get_ptr(m_data.get(), device_id));
221  }
222 
224  return static_cast<element_type*>(parsec_data_get_ptr(m_data.get(), 0));
225  }
226 
227  const element_type* host_ptr() const {
228  return static_cast<element_type*>(parsec_data_get_ptr(m_data.get(), 0));
229  }
230 
231  bool is_valid_on(const ttg::device::Device& device) const {
232  assert(is_valid());
233  int device_id = detail::ttg_device_to_parsec_device(device);
234  return (parsec_data_get_ptr(m_data.get(), device_id) != nullptr);
235  }
236 
237  void allocate_on(const ttg::device::Device& device_id) {
238  /* TODO: need exposed PaRSEC memory allocator */
239  throw std::runtime_error("not implemented yet");
240  }
241 
242  /* TODO: can we do this automatically?
243  * Pin the memory on all devices we currently track.
244  * Pinned memory won't be released by PaRSEC and can be used
245  * at any time.
246  */
247  void pin() {
248  for (int i = 1; i < parsec_nb_devices; ++i) {
249  pin_on(i);
250  }
251  }
252 
253  /* Unpin the memory on all devices we currently track. */
254  void unpin() {
255  if (!is_valid()) return;
256  for (int i = 0; i < parsec_nb_devices-detail::first_device_id; ++i) {
257  unpin_on(i);
258  }
259  }
260 
261  /* Pin the memory on a given device */
262  void pin_on(int device_id) {
263  /* TODO: how can we pin memory on a device? */
264  }
265 
266  /* Pin the memory on a given device */
267  void unpin_on(int device_id) {
268  /* TODO: how can we unpin memory on a device? */
269  }
270 
271  bool is_valid() const {
272  return !!m_data;
273  }
274 
275  operator bool() const {
276  return is_valid();
277  }
278 
279  std::size_t size() const {
280  return m_count;
281  }
282 
283  /* Reallocate the buffer with count elements */
284  void reset(std::size_t n) {
285  /* TODO: can we resize if count is smaller than m_count? */
286 
287  if (m_owned) {
288  deallocate();
289  m_owned = false;
290  }
291 
292  if (n == 0) {
293  m_host_data = nullptr;
294  m_owned = false;
295  } else {
296  m_host_data = allocate(n);
297  m_owned = true;
298  }
299  reset_parsec_data(m_host_data, n*sizeof(element_type));
300  //std::cout << "buffer::reset(" << count << ") ptr " << m_host_data.get()
301  // << " ttg_copy " << m_ttg_copy
302  // << " parsec_data " << m_data.get() << std::endl;
303  m_count = n;
304  }
305 
306  /* Reset the buffer to use the ptr to count elements */
307  void reset(T* ptr, std::size_t n = 1) {
308  /* TODO: can we resize if count is smaller than m_count? */
309  if (n == m_count) {
310  return;
311  }
312 
313  if (m_owned) {
314  deallocate();
315  }
316 
317  if (nullptr == ptr) {
318  m_host_data = nullptr;
319  m_count = 0;
320  m_owned = false;
321  } else {
322  m_host_data = ptr;
323  m_count = n;
324  m_owned = false;
325  }
326  reset_parsec_data(m_host_data, n*sizeof(element_type));
327  //std::cout << "buffer::reset(" << ptr << ", " << count << ") ptr " << m_host_data.get()
328  // << " ttg_copy " << m_ttg_copy
329  // << " parsec_data " << m_data.get() << std::endl;
330  }
331 
333  /* only set device if the host has the latest copy as otherwise we might end up with a stale copy */
334  if (dev.is_device() && this->parsec_data()->owner_device == 0) {
335  parsec_advise_data_on_device(this->parsec_data(), detail::ttg_device_to_parsec_device(dev),
336  PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
337  }
338  }
339 
340  /* serialization support */
341 
342 #ifdef TTG_SERIALIZATION_SUPPORTS_BOOST
343  template <typename Archive>
344  void serialize(Archive& ar, const unsigned int version) {
345  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
346  std::size_t s = size();
347  ar& s;
348  assert(m_ttg_copy != nullptr); // only tracked objects allowed
350  } else {
351  std::size_t s;
352  ar & s;
353  /* initialize internal pointers and then reset */
354  reset(s);
355  assert(m_ttg_copy != nullptr); // only tracked objects allowed
357  }
358  }
359 #endif // TTG_SERIALIZATION_SUPPORTS_BOOST
360 
361 #ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS
362  template <typename Archive>
363  std::enable_if_t<std::is_base_of_v<madness::archive::BufferInputArchive, Archive> ||
364  std::is_base_of_v<madness::archive::BufferOutputArchive, Archive>>
365  serialize(Archive& ar) {
366  if constexpr (ttg::detail::is_output_archive_v<Archive>) {
367  std::size_t s = size();
368  ar& s;
369  assert(m_ttg_copy != nullptr); // only tracked objects allowed
370  /* transfer from the current device
371  * note: if the transport layer (MPI) does not support device transfers
372  * the data will have been pushed out */
374  } else {
375  std::size_t s;
376  ar & s;
377  //std::cout << "serialize(IN) buffer " << this << " size " << s << std::endl;
378  /* initialize internal pointers and then reset */
379  reset(s);
380  assert(m_ttg_copy != nullptr); // only tracked objects allowed
381  /* transfer to the current device
382  * TODO: how can we make sure the device copy is not evicted? */
384  }
385  }
386 #endif // TTG_SERIALIZATION_SUPPORTS_MADNESS
387 
388 
389 };
390 
391 namespace detail {
392  template<typename T, typename A>
393  parsec_data_t* get_parsec_data(const ttg_parsec::Buffer<T, A>& db) {
394  return const_cast<parsec_data_t*>(db.m_data.get());
395  }
396 } // namespace detail
397 
398 } // namespace ttg_parsec
399 
400 #endif // TTG_PARSEC_BUFFER_H
Represents a device in a specific execution space.
Definition: device.h:23
bool is_device() const
Definition: device.h:52
Device current_device()
Definition: device.h:173
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
int first_device_id
Definition: device.h:12
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
parsec_data_t * get_parsec_data(const ttg_parsec::Buffer< T, A > &db)
Definition: buffer.h:393
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
std::array< int, 3 > version()
Definition: version.cc:4
std::decay_t< T > element_type
Definition: buffer.h:43
element_type * current_device_ptr()
Definition: buffer.h:180
Buffer(element_type *ptr, std::size_t n=1)
Definition: buffer.h:99
element_type * owner_device_ptr()
Definition: buffer.h:196
Buffer(Buffer &&db)
Definition: buffer.h:122
void prefer_device(ttg::device::Device dev)
Definition: buffer.h:332
void set_current_device(const ttg::device::Device &device)
Definition: buffer.h:162
std::size_t size() const
Definition: buffer.h:279
bool is_valid() const
Definition: buffer.h:271
virtual ~Buffer()
Definition: buffer.h:113
void reset(T *ptr, std::size_t n=1)
Definition: buffer.h:307
Buffer(std::size_t n)
Definition: buffer.h:82
bool is_valid_on(const ttg::device::Device &device) const
Definition: buffer.h:231
element_type * device_ptr_on(const ttg::device::Device &device)
Definition: buffer.h:209
Buffer(const Buffer &db)=delete
ttg::device::Device get_owner_device() const
Definition: buffer.h:174
Buffer & operator=(const Buffer &db)=delete
std::allocator_traits< Allocator > allocator_traits
Definition: buffer.h:45
element_type * host_ptr()
Definition: buffer.h:223
const element_type * host_ptr() const
Definition: buffer.h:227
void reset(std::size_t n)
Definition: buffer.h:284
void pin_on(int device_id)
Definition: buffer.h:262
void allocate_on(const ttg::device::Device &device_id)
Definition: buffer.h:237
typename allocator_traits::allocator_type allocator_type
Definition: buffer.h:46
const element_type * device_ptr_on(const ttg::device::Device &device) const
Definition: buffer.h:217
const element_type * owner_device_ptr() const
Definition: buffer.h:202
const element_type * current_device_ptr() const
Definition: buffer.h:187
void unpin_on(int device_id)
Definition: buffer.h:267
Buffer & operator=(Buffer &&db)
Definition: buffer.h:140
void iovec_add(const ttg::iovec &iov)
void reset_parsec_data(void *ptr, size_t size)