warpcore 0.0.1
Hashing at the Speed of Light on modern CUDA-accelerators
hash_set.cuh
Go to the documentation of this file.
1 #ifndef WARPCORE_HASH_SET_CUH
2 #define WARPCORE_HASH_SET_CUH
3 
4 #include "base.cuh"
5 
6 namespace warpcore
7 {
8 
9 /*! \brief hash set
10  * \tparam Key key type (\c std::uint32_t or \c std::uint64_t)
11  * \tparam EmptyKey key which represents an empty slot
12  * \tparam TombstoneKey key which represents an erased slot
13  * \tparam ProbingScheme probing scheme from \c warpcore::probing_schemes
14  * \tparam TempMemoryBytes size of temporary storage (typically a few kB)
15  */
16 template<
17  class Key,
22 class HashSet
23 {
24  static_assert(
26  "invalid key type");
27 
28  static_assert(
30  "empty key and tombstone key must not be identical");
31 
32  static_assert(
34  "not a valid probing scheme type");
35 
36  static_assert(
37  std::is_same<typename ProbingScheme::key_type, Key>::value,
38  "probing key type differs from table's key type");
39 
40  static_assert(
41  TempMemoryBytes >= sizeof(index_t),
42  "temporary storage must at least be of size index_type");
43 
44 public:
45  using key_type = Key;
48 
49  /*! \brief get empty key
50  * \return empty key
51  */
53  static constexpr key_type empty_key() noexcept
54  {
55  return EmptyKey;
56  }
57 
58  /*! \brief get tombstone key
59  * \return tombstone key
60  */
62  static constexpr key_type tombstone_key() noexcept
63  {
64  return TombstoneKey;
65  }
66 
67  /*! \brief get cooperative group size
68  * \return cooperative group size
69  */
71  static constexpr index_type cg_size() noexcept
72  {
73  return ProbingScheme::cg_size();
74  }
75 
76  /*! \brief constructor
77  * \param[in] capacity maximum cardinality of the set
78  * \param[in] seed random seed
79  */
81  explicit HashSet(
83  key_type seed = defaults::seed<key_type>()) noexcept :
84  status_(nullptr),
85  keys_(nullptr),
87  temp_(TempMemoryBytes / sizeof(index_type)),
88  seed_(seed),
89  is_initialized_(false),
90  is_copy_(false)
91  {
93 
94  const auto total_bytes = (sizeof(key_type) * capacity()) + sizeof(Status);
95 
97  {
98  cudaMalloc(&keys_, sizeof(key_type) * capacity_);
99  cudaMalloc(&status_, sizeof(Status));
100 
102  is_initialized_ = true;
103 
104  init();
105  }
106  else
107  {
109  }
110  }
111 
112  /*! \brief copy-constructor (shallow)
113  * \param[in] object to be copied
114  */
116  HashSet(const HashSet& o) noexcept :
117  status_(o.status_),
118  keys_(o.keys_),
120  temp_(o.temp_),
121  seed_(o.seed_),
123  is_copy_(true)
124  {}
125 
126  /*! \brief move-constructor
127  * \param[in] object to be moved
128  */
130  HashSet(HashSet&& o) noexcept :
131  status_(std::move(o.status_)),
132  keys_(std::move(o.keys_)),
134  temp_(std::move(o.temp_)),
135  seed_(std::move(o.seed_)),
138  {
139  o.is_copy_ = true;
140  }
141 
142  #ifndef __CUDA_ARCH__
143  /*! \brief destructor
144  */
146  ~HashSet() noexcept
147  {
148  if(!is_copy_)
149  {
150  if(keys_ != nullptr) cudaFree(keys_);
151  if(status_ != nullptr) cudaFree(status_);
152  }
153  }
154  #endif
155 
156  /*! \brief (re)initialize the hash set
157  * \param[in] seed random seed
158  * \param[in] stream CUDA stream in which this operation is executed in
159  */
161  void init(
162  const key_type seed,
163  const cudaStream_t stream = 0) noexcept
164  {
165  seed_ = seed;
166 
167  if(is_initialized_)
168  {
171  (keys_, capacity_);
172 
174  }
175  }
176 
177  /*! \brief (re)initialize the hash set
178  * \param[in] stream CUDA stream in which this operation is executed in
179  */
181  void init(const cudaStream_t stream = 0) noexcept
182  {
183  init(seed_, stream);
184  }
185 
186  /*! \brief inserts a key into the hash set
187  * \param[in] key_in key to insert into the hash set
188  * \param[in] group cooperative group
189  * \param[in] probing_length maximum number of probing attempts
190  * \return status (per thread)
191  */
195  const cg::thread_block_tile<cg_size()>& group,
197  {
198  if(!is_initialized_) return Status::not_initialized();
199 
200  if(!is_valid_key(key_in))
201  {
203  return Status::invalid_key();
204  }
205 
207 
208  for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next())
209  {
211  const bool hit = (table_key == key_in);
212  const auto hit_mask = group.ballot(hit);
213 
214  if(hit_mask)
215  {
217  return Status::duplicate_key();
218  }
219 
220  // !not_is_valid_key?
222 
223  bool success = false;
224  bool duplicate = false;
225 
226  while(empty_mask)
227  {
228  const auto leader = ffs(empty_mask) - 1;
229 
230  if(group.thread_rank() == leader)
231  {
232  const auto old = atomicCAS(keys_ + i, table_key, key_in);
233  success = (old == table_key);
234  duplicate = (old == key_in);
235  }
236 
237  if(group.any(duplicate))
238  {
240  return Status::duplicate_key();
241  }
242 
243  if(group.any(success))
244  {
245  return Status::none();
246  }
247 
248  empty_mask ^= 1UL << leader;
249  }
250  }
251 
254  }
255 
256  /*! \brief insert a set of keys into the hash set
257  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
258  * \param[in] keys_in pointer to keys to insert into the hash set
259  * \param[in] num_in number of keys to insert
260  * \param[in] stream CUDA stream in which this operation is executed in
261  * \param[in] probing_length maximum number of probing attempts
262  * \param[out] status_out status information per key
263  */
264  template<class StatusHandler = defaults::status_handler_t>
266  void insert(
267  const key_type * keys_in,
269  cudaStream_t stream = 0,
271  typename StatusHandler::base_type * status_out = nullptr) noexcept
272  {
273  static_assert(
275  "not a valid status handler type");
276 
277  if(!is_initialized_) return;
278 
282  }
283 
284  /*! \brief retrieves a key from the hash set
285  * \param[in] key_in key to retrieve from the hash set
286  * \param[out] flag_out \c true iff \c key_in is member of the set
287  * \param[in] group cooperative group
288  * \param[in] probing_length maximum number of probing attempts
289  * \return status (per thread)
290  */
293  Key key_in,
294  bool& flag_out,
295  const cg::thread_block_tile<cg_size()>& group,
296  index_type probing_length = defaults::probing_length()) const noexcept
297  {
298  if(!is_initialized_) return Status::not_initialized();
299 
300  if(!is_valid_key(key_in))
301  {
303  return Status::invalid_key();
304  }
305 
307 
308  for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next())
309  {
311  const bool hit = (table_key == key_in);
312  const auto hit_mask = group.ballot(hit);
313 
314  if(hit_mask)
315  {
316  flag_out = true;
317  return Status::none();
318  }
319 
321  {
322  flag_out = false;
323  return Status::none();
324  }
325  }
326 
329  }
330 
331  /*! \brief retrieve a set of keys from the hash table
332  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
333  * \param[in] keys_in pointer to keys to retrieve from the hash table
334  * \param[in] num_in number of keys to retrieve
335  * \param[out] flags_out flags membership of \c keys_in in the set
336  * \param[in] stream CUDA stream in which this operation is executed in
337  * \param[in] probing_length maximum number of probing attempts
338  * \param[out] status_out status information (per key)
339  */
340  template<class StatusHandler = defaults::status_handler_t>
342  void retrieve(
343  const key_type * keys_in,
345  bool * flags_out,
346  cudaStream_t stream = 0,
348  typename StatusHandler::base_type * status_out = nullptr) const noexcept
349  {
350  static_assert(
352  "not a valid status handler type");
353 
354  if(!is_initialized_) return;
355 
359  }
360 
361  /*! \brief retrieves all elements from the hash set
362  * \param[out] keys_out location to store all retrieved keys
363  * \param[out] num_out number of of keys retrieved
364  * \param[in] stream CUDA stream in which this operation is executed in
365  */
368  key_type * keys_out,
370  cudaStream_t stream = 0) const noexcept
371  {
372  if(!is_initialized_) return;
373 
374  index_type * tmp = temp_.get();
375 
376  cudaMemsetAsync(tmp, 0, sizeof(index_type), stream);
377 
378  for_each([=, *this] DEVICEQUALIFIER (key_type key)
380 
382 
383  if(stream == 0)
384  {
386  }
387  }
388 
389  /*! \brief erases a key from the hash table
390  * \param[in] key_in key to erase from the hash table
391  * \param[in] group cooperative group
392  * \param[in] probing_length maximum number of probing attempts
393  * \return status (per thread)
394  */
398  const cg::thread_block_tile<cg_size()>& group,
400  {
401  if(!is_initialized_) return Status::not_initialized();
402 
403  if(!is_valid_key(key_in))
404  {
406  return Status::invalid_key();
407  }
408 
410 
411  for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next())
412  {
414  bool hit = (table_key == key_in);
415  auto hit_mask = group.ballot(hit);
416 
417  if(hit_mask)
418  {
419  auto leader = ffs(hit_mask)-1;
420 
421  if(group.thread_rank() == leader)
422  {
423  keys_[i] = tombstone_key();
424  }
425 
426  return Status::none();
427  }
428 
430  {
431  return Status::none();
432  return Status::key_not_found();
433  }
434  }
435 
436  return Status::none();
438  }
439 
440  /*! \brief erases a set of keys from the hash table
441  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
442  * \param[in] keys_in pointer to keys to erase from the hash table
443  * \param[in] num_in number of keys to erase
444  * \param[in] stream CUDA stream in which this operation is executed in
445  * \param[in] probing_length maximum number of probing attempts
446  * \param[out] status_out status information (per key)
447  */
448  template<class StatusHandler = defaults::status_handler_t>
450  void erase(
451  key_type * keys_in,
453  cudaStream_t stream = 0,
455  typename StatusHandler::base_type * status_out = nullptr) noexcept
456  {
457  if(!is_initialized_) return;
458 
462  }
463 
464  /*! \brief applies a funtion on all keys inside the table
465  * \tparam Func type of map i.e. CUDA device lambda
466  * \param[in] f map to apply
467  * \param[in] stream CUDA stream in which this operation is executed in
468  * \param[in] size of shared memory to reserve for this execution
469  */
470  template<class Func>
472  void for_each(
473  Func f,
474  cudaStream_t stream = 0,
475  index_type smem_bytes = 0) const noexcept
476  {
477  if(!is_initialized_) return;
478 
481  ([=, *this] DEVICEQUALIFIER // TODO mutable?
482  {
484 
485  if(tid < capacity())
486  {
487  const key_type key = keys_[tid];
488  if(is_valid_key(key))
489  {
490  f(key);
491  }
492  }
493  });
494  }
495 
496  /*! \brief number of key/value pairs stored inside the hash set
497  * \return the number of key/value pairs inside the hash table
498  */
500  index_type size(cudaStream_t stream = 0) const noexcept
501  {
502  if(!is_initialized_) return 0;
503 
504  index_type out;
505  index_type * tmp = temp_.get();
506 
507  cudaMemsetAsync(tmp, 0, sizeof(index_type), stream);
508 
511  ([=, *this] DEVICEQUALIFIER
512  {
514 
517 
518  if(tid >= capacity()) return;
519 
520  const bool empty = !is_valid_key(keys_[tid]);
521 
522  if(block.thread_rank() == 0)
523  {
524  smem = 0;
525  }
526 
527  block.sync();
528 
529  if(!empty)
530  {
532 
533  if(active_threads.thread_rank() == 0)
534  {
536  }
537  }
538 
539  block.sync();
540 
541  if(block.thread_rank() == 0)
542  {
543  atomicAdd(tmp, smem);
544  }
545  });
546 
548  &out,
549  tmp,
550  sizeof(index_type),
551  D2H,
552  stream);
553 
555 
556  return out;
557  }
558 
559  /*! \brief current load factor of the hash set
560  * \param stream CUDA stream in which this operation is executed in
561  * \return load factor
562  */
564  float load_factor(cudaStream_t stream = 0) const noexcept
565  {
566  return float(size(stream)) / float(capacity());
567  }
568 
569  /*! \brief current storage density of the hash set
570  * \param stream CUDA stream in which this operation is executed in
571  * \return storage density
572  */
574  float storage_density(cudaStream_t stream = 0) const noexcept
575  {
576  return load_factor(stream);
577  }
578 
579  /*! \brief get the capacity of the hash table
580  * \return number of slots in the hash table
581  */
583  index_type capacity() const noexcept
584  {
585  return capacity_;
586  }
587 
588  /*! \brief get the total number of bytes occupied by this data structure
589  * \return bytes
590  */
592  index_type bytes_total() const noexcept
593  {
594  return capacity_ * sizeof(key_type) + temp_.bytes_total() + sizeof(status_type);
595  }
596 
597  /*! \brief get the status of the hash table
598  * \param stream CUDA stream in which this operation is executed in
599  * \return the status
600  */
602  Status peek_status(cudaStream_t stream = 0) const noexcept
603  {
605 
606  if(status_ != nullptr)
607  {
609  &status,
610  status_,
611  sizeof(Status),
612  D2H,
613  stream);
614 
616  }
617 
618  return status;
619  }
620 
621  /*! \brief get and reset the status of the hash table
622  * \param stream CUDA stream in which this operation is executed in
623  * \return the status
624  */
627  {
629 
630  if(status_ != nullptr)
631  {
633  &status,
634  status_,
635  sizeof(Status),
636  D2H,
637  stream);
638 
640 
642  }
643 
644  return status;
645  }
646 
647  /*! \brief checks if \c key is equal to \c EmptyKey
648  * \return \c bool
649  */
651  static constexpr bool is_empty_key(key_type key) noexcept
652  {
653  return (key == empty_key());
654  }
655 
656  /*! \brief checks if \c key is equal to \c TombstoneKey
657  * \return \c bool
658  */
660  static constexpr bool is_tombstone_key(key_type key) noexcept
661  {
662  return (key == tombstone_key());
663  }
664 
665  /*! \brief checks if \c key is equal to \c (EmptyKey||TombstoneKey)
666  * \return \c bool
667  */
669  static constexpr bool is_valid_key(key_type key) noexcept
670  {
671  return (key != empty_key() && key != tombstone_key());
672  }
673 
674  /*! \brief indicates if this object is a shallow copy
675  * \return \c bool
676  */
678  bool is_copy() const noexcept
679  {
680  return is_copy_;
681  }
682 
683 private:
684  /*! \brief assigns the hash set's status
685  * \param[in] status new status
686  * \param[in] stream CUDA stream in which this operation is executed in
687  */
689  void assign_status(
690  Status status,
691  cudaStream_t stream = 0) const noexcept
692  {
693  if(!is_initialized_) return;
694 
696  status_,
697  &status,
698  sizeof(Status),
699  H2D,
700  stream);
701 
703  }
704 
705  /*! \brief joins additional flags to the hash set's status
706  * \param[in] status new status
707  * \param[in] stream CUDA stream in which this operation is executed in
708  */
710  void join_status(
711  Status status,
712  cudaStream_t stream = 0) const noexcept
713  {
714  if(!is_initialized_) return;
715 
717 
719  status_,
720  &joined,
721  sizeof(Status),
722  H2D,
723  stream);
724 
726  }
727 
728  Status * status_; //< pointer to status
729  key_type * keys_ ; //< pointer to key store
730  const index_type capacity_; //< number of slots in the hash table
731  storage::CyclicStore<index_type> temp_; //< temporary memory
732  key_type seed_; //< random seed
733  bool is_initialized_; //< indicates if the set is properly initialized
734  bool is_copy_; //< indicates if this object is a shallow copy
735 
736 }; // class HashSet
737 
738 } // namespace warpcore
739 
740 #endif /* WARPCORE_HASH_SET_CUH */