warpcore 0.0.1
Hashing at the Speed of Light on modern CUDA-accelerators
gpu_engine.cuh
Go to the documentation of this file.
1 #ifndef WARPCORE_GPU_ENGINE_CUH
2 #define WARPCORE_GPU_ENGINE_CUH
3 
4 namespace warpcore
5 {
6 
7 /*! \brief CUDA kernels
8  */
9 namespace kernels
10 {
11 
12 template<
13  class T,
14  T Val = 0>
16 void memset(
17  T * const arr,
18  const index_t num)
19 {
20  const index_t tid = helpers::global_thread_id();
21 
22  if(tid < num)
23  {
24  arr[tid] = Val;
25  }
26 }
27 
28 template<
29  class Func,
30  class Core>
32 void for_each(
33  Func f,
34  const Core core)
35 {
36  const index_t tid = helpers::global_thread_id();
37 
38  if(tid < core.capacity())
39  {
40  auto&& pair = core.table_[tid];
41  if(core.is_valid_key(pair.key))
42  {
43  f(pair.key, pair.value);
44  }
45  }
46 }
47 
48 template<
49  class Func,
50  class Core>
53  Func f,
54  const Core core)
55 {
56  using index_type = typename Core::index_type;
57  using probing_scheme_type = typename Core::probing_scheme_type;
58 
59  const index_t tid = helpers::global_thread_id();
60  const index_t gid = tid / Core::cg_size();
61  const auto group =
62  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
63 
64  if(gid < core.capacity())
65  {
66  // for valid entry in table check if this entry is the first of its key
67  auto search_key = core.table_[gid].key;
68  if(core.is_valid_key(search_key))
69  {
70  probing_scheme_type iter(core.capacity(), core.capacity(), group);
71 
72  for(index_type i = iter.begin(search_key, core.seed_); i != iter.end(); i = iter.next())
73  {
74  const auto table_key = core.table_[i].key;
75  const auto hit = (table_key == search_key);
76  const auto hit_mask = group.ballot(hit);
77 
78  const auto leader = ffs(hit_mask) - 1;
79 
80  // check if search_key is the first entry for this key
81  if(group.thread_rank() == leader && i == gid)
82  {
83  f(table_key);
84  }
85 
86  if(group.any(hit))
87  {
88  return;
89  }
90  }
91  }
92  }
93 }
94 
95 template<
96  class Func,
97  class Core,
98  class StatusHandler = defaults::status_handler_t>
100 void for_each(
101  Func f,
102  const typename Core::key_type * const keys_in,
103  const index_t num_in,
104  const Core core,
106  typename StatusHandler::base_type * const status_out = nullptr)
107 {
108  const index_t tid = helpers::global_thread_id();
109  const index_t gid = tid / Core::cg_size();
110  const auto group =
111  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
112 
113  if(gid < num_in)
114  {
115  index_t num_values;
116 
117  const auto status =
118  core.for_each(f, keys_in[gid], num_values, group, probing_length);
119 
120  if(group.thread_rank() == 0)
121  {
122  StatusHandler::handle(status, status_out, gid);
123  }
124  }
125 }
126 
127 namespace bloom_filter
128 {
129 
130 template<class Core>
132 void insert(
133  const typename Core::key_type * const keys_in,
134  const index_t num_in,
135  Core core)
136 {
137  const index_t tid = helpers::global_thread_id();
138  const index_t gid = tid / Core::cg_size();
139  const auto group =
140  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
141 
142  if(gid < num_in)
143  {
144  core.insert(keys_in[gid], group);
145  }
146 }
147 
148 
149 template<class Core>
151 void retrieve(
152  const typename Core::key_type * const keys_in,
153  const index_t num_in,
154  typename Core::value_type * const values_out,
155  const Core core)
156 {
157  const index_t tid = blockDim.x * blockIdx.x + threadIdx.x;
158  const index_t gid = tid / Core::cg_size();
159  const auto group =
160  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
161 
162  if(gid < num_in)
163  {
164  typename Core::value_type value = core.retrieve(keys_in[gid], group);
165 
166  if(group.thread_rank() == 0)
167  {
168  values_out[gid] = value;
169  }
170  }
171 }
172 
173 
174 } // namespace bloom_filter
175 
176 
177 template<class Core, class StatusHandler = defaults::status_handler_t>
179 void insert(
180  const typename Core::key_type * const keys_in,
181  const index_t num_in,
182  Core core,
184  typename StatusHandler::base_type * const status_out = nullptr)
185 {
186  const index_t tid = helpers::global_thread_id();
187  const index_t gid = tid / Core::cg_size();
188  const auto group =
189  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
190 
191  if(gid < num_in)
192  {
193  const auto status =
194  core.insert(keys_in[gid], group, probing_length);
195 
196  if(group.thread_rank() == 0)
197  {
198  StatusHandler::handle(status, status_out, gid);
199  }
200  }
201 }
202 
203 template<class Core, class StatusHandler = defaults::status_handler_t>
205 void insert(
206  const typename Core::key_type * const keys_in,
207  const typename Core::value_type * const values_in,
208  const index_t num_in,
209  Core core,
211  typename StatusHandler::base_type * const status_out = nullptr)
212 {
213  const index_t tid = helpers::global_thread_id();
214  const index_t gid = tid / Core::cg_size();
215  const auto group =
216  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
217 
218  if(gid < num_in)
219  {
220  const auto status =
221  core.insert(keys_in[gid], values_in[gid], group, probing_length);
222 
223  if(group.thread_rank() == 0)
224  {
225  StatusHandler::handle(status, status_out, gid);
226  }
227  }
228 }
229 
230 template<class Core, class StatusHandler = defaults::status_handler_t>
232 void retrieve(
233  const typename Core::key_type * const keys_in,
234  const index_t num_in,
235  typename Core::value_type * const values_out,
236  const Core core,
238  typename StatusHandler::base_type * const status_out = nullptr)
239 {
240  const index_t tid = helpers::global_thread_id();
241  const index_t gid = tid / Core::cg_size();
242  const auto group =
243  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
244 
245  if(gid < num_in)
246  {
247  typename Core::value_type value_out;
248 
249  const auto status =
250  core.retrieve(keys_in[gid], value_out, group, probing_length);
251 
252  if(group.thread_rank() == 0)
253  {
254  if(!status.has_any())
255  {
256  values_out[gid] = value_out;
257  }
258 
259  StatusHandler::handle(status, status_out, gid);
260  }
261  }
262 }
263 
264 /*
265 template<class Core, class StatusHandler>
266 GLOBALQUALIFIER
267 void retrieve(
268  const typename Core::key_type * const keys_in,
269  const index_t num_in,
270  typename Core::key_type * const keys_out,
271  typename Core::value_type * const values_out,
272  index_t * const num_out,
273  const index_t probing_length,
274  const Core core,
275  typename StatusHandler::base_type * const status_out)
276 {
277  const index_t tid = helpers::global_thread_id();
278  const index_t gid = tid / Core::cg_size();
279  const auto group =
280  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
281 
282  if(gid < num_in)
283  {
284  const typename Core::key_type key_in = keys_in[gid];
285  typename Core::value_type value_out;
286 
287  const auto status =
288  core.retrieve(key_in, value_out, group, probing_length);
289 
290  if(group.thread_rank() == 0)
291  {
292  if(!status.has_any())
293  {
294  const auto i = helpers::atomicAggInc(num_out);
295  keys_out[i] = key_in;
296  values_out[i] = value_out;
297  }
298 
299  StatusHandler::handle(status, status_out, gid);
300  }
301  }
302 }
303 */
304 
305 template<class Core, class StatusHandler = defaults::status_handler_t>
307 void retrieve(
308  const typename Core::key_type * const keys_in,
309  const index_t num_in,
310  const index_t * const begin_offsets_in,
311  const index_t * const end_offsets_in,
312  typename Core::value_type * const values_out,
313  const Core core,
315  typename StatusHandler::base_type * const status_out = nullptr)
316 {
317  const index_t tid = helpers::global_thread_id();
318  const index_t gid = tid / Core::cg_size();
319  const auto group =
320  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
321 
322  using status_type = typename Core::status_type;
323 
324  if(gid < num_in)
325  {
326  index_t num_out;
327 
328  auto status = core.retrieve(
329  keys_in[gid],
330  values_out + begin_offsets_in[gid],
331  num_out,
332  group,
333  probing_length);
334 
335  if(group.thread_rank() == 0)
336  {
337  const auto num_prev =
338  end_offsets_in[gid] - begin_offsets_in[gid];
339 
340  if(num_prev != num_out)
341  {
342  //printf("%llu %llu\n", num_prev, num_out);
343  core.device_join_status(status_type::invalid_phase_overlap());
344  status += status_type::invalid_phase_overlap();
345  }
346 
347  StatusHandler::handle(status, status_out, gid);
348  }
349  }
350 }
351 
352 template<class Core, class StatusHandler = defaults::status_handler_t>
354 void erase(
355  const typename Core::key_type * const keys_in,
356  const index_t num_in,
357  Core core,
359  typename StatusHandler::base_type * const status_out = nullptr)
360 {
361  const index_t tid = helpers::global_thread_id();
362  const index_t gid = tid / Core::cg_size();
363  const auto group =
364  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
365 
366  if(gid < num_in)
367  {
368  const auto status =
369  core.erase(keys_in[gid], group, probing_length);
370 
371  if(group.thread_rank() == 0)
372  {
373  StatusHandler::handle(status, status_out, gid);
374  }
375  }
376 }
377 
378 template<class Core>
380 void size(
381  index_t * const num_out,
382  const Core core)
383 {
384  __shared__ index_t smem;
385 
386  const index_t tid = helpers::global_thread_id();
387  const auto block = cg::this_thread_block();
388 
389  if(tid < core.capacity())
390  {
391  const bool empty = !core.is_valid_key(core.table_[tid].key);
392 
393  if(block.thread_rank() == 0)
394  {
395  smem = 0;
396  }
397 
398  block.sync();
399 
400  if(!empty)
401  {
402  const auto active_threads = cg::coalesced_threads();
403 
404  if(active_threads.thread_rank() == 0)
405  {
406  atomicAdd(&smem, active_threads.size());
407  }
408  }
409 
410  block.sync();
411 
412  if(block.thread_rank() == 0 && smem != 0)
413  {
414  atomicAdd(num_out, smem);
415  }
416  }
417 }
418 
419 // for Core = MultiBucketHashTable
420 template<class Core>
423  index_t * const num_out,
424  const Core core)
425 {
426  __shared__ index_t smem;
427 
428  const index_t tid = helpers::global_thread_id();
429  const auto block = cg::this_thread_block();
430 
431  if(tid < core.capacity())
432  {
433  const bool empty = !core.is_valid_key(core.table_[tid].key);
434 
435  if(block.thread_rank() == 0)
436  {
437  smem = 0;
438  }
439 
440  block.sync();
441 
442  index_t value_count = 0;
443  if(!empty)
444  {
445  const auto bucket = core.table_[tid].value;
446  #pragma unroll
447  for(int b = 0; b < core.bucket_size(); ++b) {
448  const auto& value = bucket[b];
449  if(value != core.empty_value())
450  ++value_count;
451  }
452 
453  // TODO warp reduce
454  atomicAdd(&smem, value_count);
455  }
456 
457  block.sync();
458 
459  if(block.thread_rank() == 0 && smem != 0)
460  {
461  atomicAdd(num_out, smem);
462  }
463  }
464 }
465 
466 template<class Core, class StatusHandler = defaults::status_handler_t>
469  const typename Core::key_type * const keys_in,
470  const index_t num_in,
471  index_t * const num_out,
472  index_t * const num_per_key_out,
473  const Core core,
475  typename StatusHandler::base_type * const status_out = nullptr)
476 {
477  const index_t tid = helpers::global_thread_id();
478  const index_t gid = tid / Core::cg_size();
479  const auto group =
480  cg::tiled_partition<Core::cg_size()>(cg::this_thread_block());
481 
482  if(gid < num_in)
483  {
484  index_t num = 0;
485 
486  const auto status =
487  core.num_values(keys_in[gid], num, group, probing_length);
488 
489  if(group.thread_rank() == 0)
490  {
491  if(num_per_key_out != nullptr)
492  {
493  num_per_key_out[gid] = num;
494  }
495 
496  if(num != 0)
497  {
498  atomicAdd(num_out, num);
499  }
500 
501  StatusHandler::handle(status, status_out, gid);
502  }
503  }
504 }
505 
506 } // namespace kernels
507 
508 } // namespace warpcore
509 
510 #endif /* WARPCORE_GPU_ENGINE_CUH */