warpcore 0.0.1
Hashing at the Speed of Light on modern CUDA-accelerators
multi_value_hash_table.cuh
Go to the documentation of this file.
1 #ifndef WARPCORE_MULTI_VALUE_HASH_TABLE_CUH
2 #define WARPCORE_MULTI_VALUE_HASH_TABLE_CUH
3 
4 #include "hash_set.cuh"
5 
6 namespace warpcore
7 {
8 
9 /*! \brief multi-value hash table
10  * \tparam Key key type ( \c std::uint32_t or \c std::uint64_t )
11  * \tparam Value value type
12  * \tparam EmptyKey key which represents an empty slot
13  * \tparam TombstoneKey key which represents an erased slot
14  * \tparam ProbingScheme probing scheme from \c warpcore::probing_schemes
15  * \tparam TableStorage memory layout from \c warpcore::storage::key_value
16  * \tparam TempMemoryBytes size of temporary storage (typically a few kB)
17  */
18 template<
19  class Key,
20  class Value,
27 {
28  static_assert(
30  "invalid key type");
31 
32  static_assert(
34  "empty key and tombstone key must not be identical");
35 
36  static_assert(
38  "not a valid probing scheme type");
39 
40  static_assert(
41  std::is_same<typename ProbingScheme::key_type, Key>::value,
42  "probing key type differs from table's key type");
43 
44  static_assert(
46  "not a valid storage type");
47 
48  static_assert(
49  std::is_same<typename TableStorage::key_type, Key>::value,
50  "storage's key type differs from table's key type");
51 
52  static_assert(
54  "storage's value type differs from table's value type");
55 
56  static_assert(
57  TempMemoryBytes >= sizeof(index_t),
58  "temporary storage must at least be of size index_type");
59 
61 
62 public:
63  using key_type = Key;
64  using value_type = Value;
68 
69  /*! \brief get empty key
70  * \return empty key
71  */
73  static constexpr key_type empty_key() noexcept
74  {
75  return EmptyKey;
76  }
77 
78  /*! \brief get tombstone key
79  * \return tombstone key
80  */
82  static constexpr key_type tombstone_key() noexcept
83  {
84  return TombstoneKey;
85  }
86 
87  /*! \brief get cooperative group size
88  * \return cooperative group size
89  */
91  static constexpr index_type cg_size() noexcept
92  {
93  return ProbingScheme::cg_size();
94  }
95 
96  /*! \brief constructor
97  * \param[in] min_capacity minimum number of slots in the hash table
98  * \param[in] seed random seed
99  * \param[in] max_values_per_key maximum number of values to store per key
100  * \param[in] no_init whether to initialize the table at construction or not
101  */
104  const index_type min_capacity,
105  const key_type seed = defaults::seed<key_type>(),
108  const bool no_init = false) noexcept :
109  status_(nullptr),
111  temp_(TempMemoryBytes / sizeof(index_type)),
112  seed_(seed),
114  num_keys_(nullptr),
115  is_copy_(false),
116  is_initialized_(false)
117  {
118  cudaMalloc(&status_, sizeof(status_type));
119  cudaMalloc(&num_keys_, sizeof(index_type));
120 
122 
123  if(!no_init) init();
124  }
125 
126  /*! \brief copy-constructor (shallow)
127  * \param[in] object to be copied
128  */
131  status_(o.status_),
132  table_(o.table_),
133  temp_(o.temp_),
134  seed_(o.seed_),
137  is_copy_(true),
139  {}
140 
141  /*! \brief move-constructor
142  * \param[in] object to be moved
143  */
146  status_(std::move(o.status_)),
147  table_(std::move(o.table_)),
148  temp_(std::move(o.temp_)),
149  seed_(std::move(o.seed_)),
154  {
155  o.is_copy_ = true;
156  }
157 
158  #ifndef __CUDA_ARCH__
159  /*! \brief destructor
160  */
163  {
164  if(!is_copy_)
165  {
166  if(status_ != nullptr) cudaFree(status_);
167  if(num_keys_ != nullptr) cudaFree(num_keys_);
168  }
169  }
170  #endif
171 
172  /*! \brief (re)initialize the hash table
173  * \param[in] stream CUDA stream in which this operation is executed in
174  */
176  void init(const cudaStream_t stream = 0) noexcept
177  {
178  is_initialized_ = false;
179 
182  {
184 
186 
188 
189  is_initialized_ = true;
190  }
191  }
192 
193  /*! \brief inserts a key into the hash table
194  * \param[in] key_in key to insert into the hash table
195  * \param[in] value_in value that corresponds to \c key_in
196  * \param[in] group cooperative group
197  * \param[in] probing_length maximum number of probing attempts
198  * \return status (per thread)
199  */
202  const key_type key_in,
203  const value_type& value_in,
204  const cg::thread_block_tile<cg_size()>& group,
205  const index_type probing_length = defaults::probing_length()) noexcept
206  {
207  if(!is_initialized_)
208  {
209  return status_type::not_initialized();
210  }
211 
212  if(!is_valid_key(key_in))
213  {
215  return status_type::invalid_key();
216  }
217 
220 
221  for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next())
222  {
223  const key_type table_key = table_[i].key;
224 
226 
228 
230  {
234  return status;
235  }
236 
237  bool success = false; // no hash collision
238 
239  while(empty_mask)
240  {
241  bool key_collision = false;
242 
243  const auto leader = ffs(empty_mask) - 1;
244 
245  if(group.thread_rank() == leader)
246  {
247  const auto old =
249 
250  success = (old == table_key);
251  key_collision = (old == key_in);
252 
253  if(success)
254  {
255  table_[i].value = value_in;
256 
257  if(num_values == 0)
258  {
260  }
261  }
262  }
263 
264  if(group.any(success))
265  {
266  return (num_values > 0) ?
268  }
269 
271 
273  {
277  return status;
278  }
279 
280  empty_mask ^= 1UL << leader;
281  }
282  }
283 
284  status_type status = (num_values > 0) ?
288  return status;
289  }
290 
291  /*! \brief insert a set of keys into the hash table
292  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
293  * \param[in] keys_in pointer to keys to insert into the hash table
294  * \param[in] values_in corresponds values to \c keys_in
295  * \param[in] num_in number of keys to insert
296  * \param[in] stream CUDA stream in which this operation is executed in
297  * \param[in] probing_length maximum number of probing attempts
298  * \param[out] status_out status information per key
299  */
300  template<class StatusHandler = defaults::status_handler_t>
302  void insert(
303  const key_type * const keys_in,
304  const value_type * const values_in,
305  const index_type num_in,
306  const cudaStream_t stream = 0,
308  typename StatusHandler::base_type * const status_out = nullptr) noexcept
309  {
310  static_assert(
312  "not a valid status handler type");
313 
314  if(!is_initialized_) return;
315 
319  }
320 
321  /*! \brief retrieves all values to a corresponding key
322  * \param[in] key_in key to retrieve from the hash table
323  * \param[out] values_out values for \c key_in
324  * \param[out] num_out number of retrieved values
325  * \param[in] group cooperative group
326  * \param[in] probing_length maximum number of probing attempts
327  * \return status (per thread)
328  */
331  const key_type key_in,
332  value_type * const values_out,
334  const cg::thread_block_tile<cg_size()>& group,
335  const index_type probing_length = defaults::probing_length()) const noexcept
336  {
337  if(values_out == nullptr)
338  {
341  return status_type::dry_run() + status;
342  }
343  else
344  {
345  return for_each([=, *this] DEVICEQUALIFIER
346  (const key_type /* key */, const value_type& value, const index_type index)
347  {
349  },
350  key_in,
351  num_out,
352  group,
354  }
355  }
356 
357  /*! \brief retrieve a set of keys from the hash table
358  * \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
359  * \note \c end_offsets_out can be \c begin_offsets_out+1
360  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
361  * \param[in] keys_in pointer to keys to retrieve from the hash table
362  * \param[in] num_in number of keys to retrieve
363  * \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out
364  * \param[out] end_offsets_out end of value range for a corresponding key in \c values_out
365  * \param[out] num_out total number of values retrieved by this operation
366  * \param[in] stream CUDA stream in which this operation is executed in
367  * \param[in] probing_length maximum number of probing attempts
368  * \param[out] status_out status information (per key)
369  */
370  template<class StatusHandler = defaults::status_handler_t>
372  void retrieve(
373  const key_type * const keys_in,
374  const index_type num_in,
376  index_type * const end_offsets_out,
377  value_type * const values_out,
379  const cudaStream_t stream = 0,
381  typename StatusHandler::base_type * const status_out = nullptr) const noexcept
382  {
383  static_assert(
385  "not a valid status handler type");
386 
387  if(!is_initialized_) return;
388 
389  // cub::DeviceScan::InclusiveSum takes input sizes of type int
390  if(num_in > index_type(std::numeric_limits<int>::max()))
391  {
393 
394  return;
395  }
396 
397  num_values(
398  keys_in,
399  num_in,
400  num_out,
402  stream,
404 
405  if(values_out != nullptr)
406  {
408 
410  nullptr,
414  num_in,
415  stream);
416 
419 
421  {
422 
424  values_out,
428  num_in,
429  stream);
430  }
431  else
432  {
433  //slow path, need extra memory. cub caching allocator???
434  void* cubtemp = nullptr;
436 
437  if(err == cudaSuccess)
438  {
440  cubtemp,
444  num_in,
445  stream);
446 
447  cudaFree(cubtemp);
448  }
449  else
450  {
452  num_out = 0;
453 
454  cudaFree(cubtemp);
455 
456  return;
457  }
458 
459 
460  }
461 
463 
465  {
467  begin_offsets_out + 1,
469  sizeof(index_type) * (num_in - 1),
470  D2D,
471  stream);
472  }
473 
476  (
477  keys_in,
478  num_in,
481  values_out,
482  *this,
484  status_out);
485  }
486  else
487  {
488  if(status_out != nullptr)
489  {
492  ([=, *this] DEVICEQUALIFIER
493  {
495 
496  if(tid < num_in)
497  {
499  }
500  });
501  }
502 
504  }
505 
506  if(stream == 0)
507  {
509  }
510  }
511 
512  /*! \brief retrieves all elements from the hash table
513  * \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
514  * \note this method implements a multi-stage dry-run mode
515  * \param[out] keys_out pointer to the set of unique keys
516  * \param[out] num_keys_out number of unique keys
517  * \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out
518  * \param[out] end_offsets_out end of value range for a corresponding key in \c values_out
519  * \param[out] values_out array which holds all retrieved values
520  * \param[out] num_values_out total number of values retrieved by this operation
521  * \param[in] stream CUDA stream in which this operation is executed in
522  */
525  key_type * const keys_out,
528  index_type * const end_offsets_out,
529  value_type * const values_out,
531  const cudaStream_t stream = 0) const noexcept
532  {
533  if(!is_initialized_) return;
534 
536 
537  if(keys_out != nullptr)
538  {
539  retrieve(
540  keys_out,
541  num_keys_out,
544  values_out,
546  stream);
547  }
548 
549  if(stream == 0)
550  {
552  }
553  }
554 
555  /*! \brief retrieve all unqiue keys
556  * \info this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
557  * \param[out] keys_out retrieved unqiue keys
558  * \param[out] num_out numof unique keys
559  * \param[in] stream CUDA stream in which this operation is executed in
560  */
563  key_type * const keys_out,
565  const cudaStream_t stream = 0) const noexcept
566  {
567  if(!is_initialized_) return;
568 
569  if(keys_out != nullptr)
570  {
571  index_type * const tmp = temp_.get();
572  cudaMemsetAsync(tmp, 0, sizeof(index_type), stream);
573 
576  ([=] DEVICEQUALIFIER (const key_type& key)
577  {
579  keys_out[out] = key;
580  }, *this);
581 
583 
584  if(stream == 0)
585  {
587  }
588  }
589  else
590  {
593  }
594  }
595 
596  /*! \brief applies a funtion over all values of a specified key
597  * \tparam Func type of map i.e. CUDA device lambda
598  * \param[in] f map to apply
599  * \param[in] key_in key to consider
600  * \param[out] num_values_out number of values associated to \c key_in
601  * \param[in] group cooperative group
602  * \param[in] probing_length maximum number of probing attempts
603  * \return status (per thread)
604  */
605  template<class Func>
608  Func f,
609  const key_type key_in,
611  const cg::thread_block_tile<cg_size()>& group,
612  const index_type probing_length = defaults::probing_length()) const noexcept
613  {
615 
616  if(!is_valid_key(key_in))
617  {
618  num_values_out = 0;
620  return status_type::invalid_key();
621  }
622 
624 
625  index_type num = 0;
626  for(index_type i = iter.begin(key_in, seed_); i != iter.end(); i = iter.next())
627  {
628  const auto table_key = table_[i].key;
629  const auto hit = (table_key == key_in);
630  const auto hit_mask = group.ballot(hit);
631 
632  if(hit)
633  {
634  const auto j =
635  num + __popc(hit_mask & ((1UL << group.thread_rank()) - 1));
636 
637  f(key_in, table_[i].value, j);
638  }
639 
640  num += __popc(hit_mask);
641 
643  {
645 
646  if(num == 0)
647  {
649  return status_type::key_not_found();
650  }
651  else
652  {
653  return status_type::none();
654  }
655  }
656  }
657 
661  }
662 
663  /*! \brief applies a funtion over all key value pairs inside the table
664  * \tparam Func type of map i.e. CUDA device lambda
665  * \param[in] f map to apply
666  * \param[in] stream CUDA stream in which this operation is executed in
667  * \param[in] size of dynamic shared memory to reserve for this execution
668  */
669  template<class Func>
671  void for_each(
672  Func f, // TODO const?
673  const cudaStream_t stream = 0,
674  const index_type smem_bytes = 0) const noexcept
675  {
676  if(!is_initialized_) return;
677 
680  (f, *this);
681  }
682 
683  /*! \brief applies a funtion over all key value pairs
684  * \tparam Func type of map i.e. CUDA device lambda
685  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
686  * \param[in] f map to apply
687  * \param[in] keys_in keys to consider
688  * \param[in] num_in number of keys
689  * \param[in] stream CUDA stream in which this operation is executed in
690  * \param[in] probing_length maximum number of probing attempts
691  * \param[out] status_out status information (per key)
692  * \param[in] size of dynamic shared memory to reserve for this execution
693  */
694  template<class Func, class StatusHandler = defaults::status_handler_t>
696  void for_each(
697  Func f, // TODO const?
698  const key_type * const keys_in,
699  const index_type num_in,
700  const cudaStream_t stream = 0,
702  typename StatusHandler::base_type * const status_out = nullptr,
703  const index_type smem_bytes = 0) const noexcept
704  {
705  static_assert(
707  "not a valid status handler type");
708 
709  if(!is_initialized_) return;
710 
713  (f, keys_in, num_in, *this, status_out);
714  }
715 
716  /*! \brief number of unique keys inside the table
717  * \param[in] stream CUDA stream in which this operation is executed in
718  * \return number of unique keys
719  */
721  index_type num_keys(const cudaStream_t stream = 0) const noexcept
722  {
723  index_type num = 0;
724 
726 
728 
729  return num;
730  }
731 
732  /*! \brief total number of values inside the table
733  * \param[in] key_in key to be probed
734  * \param[out] num_out number of values associated to \c key_in*
735  * \param[in] group cooperative group
736  * \param[in] probing_length maximum number of probing attempts
737  * \return status (per thread)
738  */
741  const key_type key_in,
743  const cg::thread_block_tile<cg_size()>& group,
744  const index_type probing_length = defaults::probing_length()) const noexcept
745  {
746  return for_each([=] DEVICEQUALIFIER (
747  const key_type /* key */,
748  const value_type& /* value */,
749  const index_type /* index */) {},
750  key_in,
751  num_out,
752  group,
754  }
755 
756  /*! \brief number of values associated to a set of keys
757  * \info this function returns only \c num_out if \c num_per_key_out==nullptr
758  * \tparam StatusHandler handles returned status per key (see \c status_handlers)
759  * \param[in] keys_in keys to consider
760  * \param[in] num_in number of keys
761  * \param[out] num_out total number of values
762  * \param[out] num_per_key_out number of values per key
763  * \param[in] stream CUDA stream in which this operation is executed in
764  * \param[in] probing_length maximum number of probing attempts
765  * \param[out] status_out status information (per key)
766  */
767  template<class StatusHandler = defaults::status_handler_t>
770  const key_type * const keys_in,
771  const index_type num_in,
773  index_type * const num_per_key_out = nullptr,
774  const cudaStream_t stream = 0,
776  typename StatusHandler::base_type * const status_out = nullptr) const noexcept
777  {
778  if(!is_initialized_) return;
779 
780  // TODO check if shared memory is benefitial
781 
782  index_type * const tmp = temp_.get();
783  cudaMemsetAsync(tmp, 0, sizeof(index_type), stream);
784 
788 
790 
791  if(stream == 0)
792  {
794  }
795  }
796 
797  /*! \brief number of values stored inside the hash table
798  * \info alias for \c size()
799  * \param[in] stream CUDA stream in which this operation is executed in
800  * \return the number of values
801  */
803  index_type num_values(const cudaStream_t stream = 0) const noexcept
804  {
805  return size(stream);
806  }
807 
808  /*! \brief number of values stored inside the hash table
809  * \param[in] stream CUDA stream in which this operation is executed in
810  * \return the number of values
811  */
813  index_type size(const cudaStream_t stream = 0) const noexcept
814  {
815  if(!is_initialized_) return 0;
816 
817  index_type out;
818  index_type * tmp = temp_.get();
819 
820  cudaMemsetAsync(tmp, 0, sizeof(index_t), stream);
821 
822  kernels::size
824  (tmp, *this);
825 
827  &out,
828  tmp,
829  sizeof(index_type),
830  D2H,
831  stream);
832 
834 
835  return out;
836  }
837 
838  /*! \brief current load factor of the hash table
839  * \param[in] stream CUDA stream in which this operation is executed in
840  * \return load factor
841  */
843  float load_factor(const cudaStream_t stream = 0) const noexcept
844  {
845  return float(size(stream)) / float(capacity());
846  }
847 
848  /*! \brief current storage density of the hash table
849  * \param[in] stream CUDA stream in which this operation is executed in
850  * \return storage density
851  */
853  float storage_density(const cudaStream_t stream = 0) const noexcept
854  {
855  const index_type key_bytes = num_keys(stream) * sizeof(key_type);
858  return float(key_bytes + value_bytes) / float(table_bytes);
859  }
860 
861  /*! \brief get the capacity of the hash table
862  * \return number of slots in the hash table
863  */
865  index_type capacity() const noexcept
866  {
867  return table_.capacity();
868  }
869 
870  /*! \brief indicates if the hash table is properly initialized
871  * \return \c true iff the hash table is properly initialized
872  */
874  bool is_initialized() const noexcept
875  {
876  return is_initialized_;
877  }
878 
879  /*! \brief get the status of the hash table
880  * \param[in] stream CUDA stream in which this operation is executed in
881  * \return the status
882  */
884  status_type peek_status(const cudaStream_t stream = 0) const noexcept
885  {
887 
888  if(status_ != nullptr)
889  {
891  &status,
892  status_,
893  sizeof(status_type),
894  D2H,
895  stream);
896 
898  }
899 
900  return status;
901  }
902 
903  /*! \brief get and reset the status of the hash table
904  * \param[in] stream CUDA stream in which this operation is executed in
905  * \return the status
906  */
909  {
911 
912  if(status_ != nullptr)
913  {
915  &status,
916  status_,
917  sizeof(status_type),
918  D2H,
919  stream);
920 
922  }
923 
924  return status;
925  }
926 
927  /*! \brief checks if \c key is equal to \c EmptyKey
928  * \return \c bool
929  */
931  static constexpr bool is_empty_key(const key_type key) noexcept
932  {
933  return (key == empty_key());
934  }
935 
936  /*! \brief checks if \c key is equal to \c TombstoneKey
937  * \return \c bool
938  */
940  static constexpr bool is_tombstone_key(const key_type key) noexcept
941  {
942  return (key == tombstone_key());
943  }
944 
945  /*! \brief checks if \c key is equal to \c (EmptyKey||TombstoneKey)
946  * \return \c bool
947  */
949  static constexpr bool is_valid_key(const key_type key) noexcept
950  {
951  return (key != empty_key() && key != tombstone_key());
952  }
953 
954  /*! \brief indicates if this object is a shallow copy
955  * \return \c bool
956  */
958  bool is_copy() const noexcept
959  {
960  return is_copy_;
961  }
962 
963 private:
964  /*! \brief assigns the hash table's status
965  * \info \c const on purpose
966  * \param[in] status new status
967  * \param[in] stream CUDA stream in which this operation is executed in
968  */
970  void assign_status(
971  const status_type status,
972  const cudaStream_t stream = 0) const noexcept
973  {
974  if(status_ != nullptr)
975  {
977  status_,
978  &status,
979  sizeof(status_type),
980  H2D,
981  stream);
982 
984  }
985  }
986 
987  /*! \brief joins additional flags to the hash table's status
988  * \info \c const on purpose
989  * \param[in] status new status
990  * \param[in] stream CUDA stream in which this operation is executed in
991  */
993  void join_status(
994  const status_type status,
995  const cudaStream_t stream = 0) const noexcept
996  {
997  if(status_ != nullptr)
998  {
1000  const status_type joined = peeked + status;
1001 
1002  if(joined != peeked)
1003  {
1005  }
1006  }
1007  }
1008 
1009  /*! \brief joins additional flags to the hash table's status
1010  * \info \c const on purpose
1011  * \param[in] status new status
1012  */
1014  void device_join_status(const status_type status) const noexcept
1015  {
1016  if(status_ != nullptr)
1017  {
1019  }
1020  }
1021 
1022  status_type * status_; //< pointer to status
1023  TableStorage table_; //< actual key/value storage
1024  temp_type temp_; //< temporary memory
1025  key_type seed_; //< random seed
1026  index_type max_values_per_key_; //< maximum number of values to store per key
1027  index_type * num_keys_; //< pointer to the count of unique keys
1028  bool is_copy_; //< indicates if table is a shallow copy
1029  bool is_initialized_; //< indicates if table is properly initialized
1030 
1031  template<class Core>
1033  friend void kernels::size(index_type * const, const Core);
1034 
1035  template<class Func, class Core>
1037  friend void kernels::for_each(Func, const Core);
1038 
1039  template<class Func, class Core>
1041  friend void kernels::for_each_unique_key(Func, const Core);
1042 
1043  template<class Core, class StatusHandler>
1045  friend void kernels::retrieve(
1046  const typename Core::key_type * const,
1047  const index_type,
1048  const index_type * const,
1049  const index_type * const,
1050  typename Core::value_type * const,
1051  const Core,
1052  const index_type,
1053  typename StatusHandler::base_type * const);
1054 
1055 }; // class MultiValueHashTable
1056 
1057 } // namespace warpcore
1058 
1059 #endif /* WARPCORE_MULTI_VALUE_HASH_TABLE_CUH */