warpcore
0.0.1
Hashing at the Speed of Light on modern CUDA-accelerators
multi_value_hash_table.cuh
Go to the documentation of this file.
1
#
ifndef
WARPCORE_MULTI_VALUE_HASH_TABLE_CUH
2
#
define
WARPCORE_MULTI_VALUE_HASH_TABLE_CUH
3
4
#
include
"hash_set.cuh"
5
6
namespace
warpcore
7
{
8
9
/*! \brief multi-value hash table
10
* \tparam Key key type ( \c std::uint32_t or \c std::uint64_t )
11
* \tparam Value value type
12
* \tparam EmptyKey key which represents an empty slot
13
* \tparam TombstoneKey key which represents an erased slot
14
* \tparam ProbingScheme probing scheme from \c warpcore::probing_schemes
15
* \tparam TableStorage memory layout from \c warpcore::storage::key_value
16
* \tparam TempMemoryBytes size of temporary storage (typically a few kB)
17
*/
18
template
<
19
class
Key
,
20
class
Value
,
21
Key
EmptyKey
=
defaults
::
empty_key
<
Key
>(),
22
Key
TombstoneKey
=
defaults
::
tombstone_key
<
Key
>(),
23
class
ProbingScheme
=
defaults
::
probing_scheme_t
<
Key
, 8>,
24
class
TableStorage
=
defaults
::
table_storage_t
<
Key
,
Value
>,
25
index_t
TempMemoryBytes
=
defaults
::
temp_memory_bytes
()>
26
class
MultiValueHashTable
27
{
28
static_assert
(
29
checks
::
is_valid_key_type
<
Key
>(),
30
"invalid key type"
);
31
32
static_assert
(
33
EmptyKey
!=
TombstoneKey
,
34
"empty key and tombstone key must not be identical"
);
35
36
static_assert
(
37
checks
::
is_cycle_free_probing_scheme
<
ProbingScheme
>(),
38
"not a valid probing scheme type"
);
39
40
static_assert
(
41
std
::
is_same
<
typename
ProbingScheme
::
key_type
,
Key
>::
value
,
42
"probing key type differs from table's key type"
);
43
44
static_assert
(
45
checks
::
is_key_value_storage
<
TableStorage
>(),
46
"not a valid storage type"
);
47
48
static_assert
(
49
std
::
is_same
<
typename
TableStorage
::
key_type
,
Key
>::
value
,
50
"storage's key type differs from table's key type"
);
51
52
static_assert
(
53
std
::
is_same
<
typename
TableStorage
::
value_type
,
Value
>::
value
,
54
"storage's value type differs from table's value type"
);
55
56
static_assert
(
57
TempMemoryBytes
>=
sizeof
(
index_t
),
58
"temporary storage must at least be of size index_type"
);
59
60
using
temp_type
=
storage
::
CyclicStore
<
index_t
>;
61
62
public
:
63
using
key_type
=
Key
;
64
using
value_type
=
Value
;
65
using
index_type
=
index_t
;
66
using
status_type
=
Status
;
67
using
probing_scheme_type
=
ProbingScheme
;
68
69
/*! \brief get empty key
70
* \return empty key
71
*/
72
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
73
static
constexpr
key_type
empty_key
()
noexcept
74
{
75
return
EmptyKey
;
76
}
77
78
/*! \brief get tombstone key
79
* \return tombstone key
80
*/
81
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
82
static
constexpr
key_type
tombstone_key
()
noexcept
83
{
84
return
TombstoneKey
;
85
}
86
87
/*! \brief get cooperative group size
88
* \return cooperative group size
89
*/
90
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
91
static
constexpr
index_type
cg_size
()
noexcept
92
{
93
return
ProbingScheme
::
cg_size
();
94
}
95
96
/*! \brief constructor
97
* \param[in] min_capacity minimum number of slots in the hash table
98
* \param[in] seed random seed
99
* \param[in] max_values_per_key maximum number of values to store per key
100
* \param[in] no_init whether to initialize the table at construction or not
101
*/
102
HOSTQUALIFIER
INLINEQUALIFIER
103
explicit
MultiValueHashTable
(
104
const
index_type
min_capacity
,
105
const
key_type
seed
=
defaults
::
seed
<
key_type
>(),
106
const
index_type
max_values_per_key
=
107
std
::
numeric_limits
<
index_type
>::
max
(),
108
const
bool
no_init
=
false
)
noexcept
:
109
status_
(
nullptr
),
110
table_
(
detail
::
get_valid_capacity
(
min_capacity
,
cg_size
())),
111
temp_
(
TempMemoryBytes
/
sizeof
(
index_type
)),
112
seed_
(
seed
),
113
max_values_per_key_
(
max_values_per_key
),
114
num_keys_
(
nullptr
),
115
is_copy_
(
false
),
116
is_initialized_
(
false
)
117
{
118
cudaMalloc
(&
status_
,
sizeof
(
status_type
));
119
cudaMalloc
(&
num_keys_
,
sizeof
(
index_type
));
120
121
assign_status
(
table_
.
status
() +
temp_
.
status
());
122
123
if
(!
no_init
)
init
();
124
}
125
126
/*! \brief copy-constructor (shallow)
127
* \param[in] object to be copied
128
*/
129
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
130
MultiValueHashTable
(
const
MultiValueHashTable
&
o
)
noexcept
:
131
status_
(
o
.
status_
),
132
table_
(
o
.
table_
),
133
temp_
(
o
.
temp_
),
134
seed_
(
o
.
seed_
),
135
max_values_per_key_
(
o
.
max_values_per_key_
),
136
num_keys_
(
o
.
num_keys_
),
137
is_copy_
(
true
),
138
is_initialized_
(
o
.
is_initialized_
)
139
{}
140
141
/*! \brief move-constructor
142
* \param[in] object to be moved
143
*/
144
HOSTQUALIFIER
INLINEQUALIFIER
145
MultiValueHashTable
(
MultiValueHashTable
&&
o
)
noexcept
:
146
status_
(
std
::
move
(
o
.
status_
)),
147
table_
(
std
::
move
(
o
.
table_
)),
148
temp_
(
std
::
move
(
o
.
temp_
)),
149
seed_
(
std
::
move
(
o
.
seed_
)),
150
max_values_per_key_
(
std
::
move
(
o
.
max_values_per_key_
)),
151
num_keys_
(
std
::
move
(
o
.
num_keys_
)),
152
is_copy_
(
std
::
move
(
o
.
is_copy_
)),
153
is_initialized_
(
std
::
move
(
o
.
is_initialized_
))
154
{
155
o
.
is_copy_
=
true
;
156
}
157
158
#
ifndef
__CUDA_ARCH__
159
/*! \brief destructor
160
*/
161
HOSTQUALIFIER
INLINEQUALIFIER
162
~
MultiValueHashTable
()
noexcept
163
{
164
if
(!
is_copy_
)
165
{
166
if
(
status_
!=
nullptr
)
cudaFree
(
status_
);
167
if
(
num_keys_
!=
nullptr
)
cudaFree
(
num_keys_
);
168
}
169
}
170
#
endif
171
172
/*! \brief (re)initialize the hash table
173
* \param[in] stream CUDA stream in which this operation is executed in
174
*/
175
HOSTQUALIFIER
INLINEQUALIFIER
176
void
init
(
const
cudaStream_t
stream
= 0)
noexcept
177
{
178
is_initialized_
=
false
;
179
180
if
(!
table_
.
status
().
has_not_initialized
() &&
181
!
temp_
.
status
().
has_not_initialized
())
182
{
183
table_
.
init_keys
(
empty_key
(),
stream
);
184
185
assign_status
(
table_
.
status
() +
temp_
.
status
(),
stream
);
186
187
cudaMemsetAsync
(
num_keys_
, 0,
sizeof
(
index_type
),
stream
);
188
189
is_initialized_
=
true
;
190
}
191
}
192
193
/*! \brief inserts a key into the hash table
194
* \param[in] key_in key to insert into the hash table
195
* \param[in] value_in value that corresponds to \c key_in
196
* \param[in] group cooperative group
197
* \param[in] probing_length maximum number of probing attempts
198
* \return status (per thread)
199
*/
200
DEVICEQUALIFIER
INLINEQUALIFIER
201
status_type
insert
(
202
const
key_type
key_in
,
203
const
value_type
&
value_in
,
204
const
cg
::
thread_block_tile
<
cg_size
()>&
group
,
205
const
index_type
probing_length
=
defaults
::
probing_length
())
noexcept
206
{
207
if
(!
is_initialized_
)
208
{
209
return
status_type
::
not_initialized
();
210
}
211
212
if
(!
is_valid_key
(
key_in
))
213
{
214
device_join_status
(
status_type
::
invalid_key
());
215
return
status_type
::
invalid_key
();
216
}
217
218
ProbingScheme
iter
(
capacity
(),
probing_length
,
group
);
219
index_type
num_values
= 0;
220
221
for
(
index_type
i
=
iter
.
begin
(
key_in
,
seed_
);
i
!=
iter
.
end
();
i
=
iter
.
next
())
222
{
223
const
key_type
table_key
=
table_
[
i
].
key
;
224
225
auto
empty_mask
=
group
.
ballot
(
is_empty_key
(
table_key
));
226
227
num_values
+=
__popc
(
group
.
ballot
(
table_key
==
key_in
));
228
229
if
(
num_values
>=
max_values_per_key_
)
230
{
231
status_type
status
=
status_type
::
duplicate_key
() +
232
status_type
::
max_values_for_key_reached
();
233
device_join_status
(
status
);
234
return
status
;
235
}
236
237
bool
success
=
false
;
// no hash collision
238
239
while
(
empty_mask
)
240
{
241
bool
key_collision
=
false
;
242
243
const
auto
leader
=
ffs
(
empty_mask
) - 1;
244
245
if
(
group
.
thread_rank
() ==
leader
)
246
{
247
const
auto
old
=
248
atomicCAS
(&(
table_
[
i
].
key
),
table_key
,
key_in
);
249
250
success
= (
old
==
table_key
);
251
key_collision
= (
old
==
key_in
);
252
253
if
(
success
)
254
{
255
table_
[
i
].
value
=
value_in
;
256
257
if
(
num_values
== 0)
258
{
259
helpers
::
atomicAggInc
(
num_keys_
);
260
}
261
}
262
}
263
264
if
(
group
.
any
(
success
))
265
{
266
return
(
num_values
> 0) ?
267
status_type
::
duplicate_key
() :
status_type
::
none
();
268
}
269
270
num_values
+=
group
.
any
(
key_collision
);
271
272
if
(
num_values
>=
max_values_per_key_
)
273
{
274
status_type
status
=
status_type
::
duplicate_key
() +
275
status_type
::
max_values_for_key_reached
();
276
device_join_status
(
status
);
277
return
status
;
278
}
279
280
empty_mask
^= 1UL <<
leader
;
281
}
282
}
283
284
status_type
status
= (
num_values
> 0) ?
285
status_type
::
probing_length_exceeded
() +
status_type
::
duplicate_key
() :
286
status_type
::
probing_length_exceeded
();
287
device_join_status
(
status
);
288
return
status
;
289
}
290
291
/*! \brief insert a set of keys into the hash table
292
* \tparam StatusHandler handles returned status per key (see \c status_handlers)
293
* \param[in] keys_in pointer to keys to insert into the hash table
294
* \param[in] values_in corresponds values to \c keys_in
295
* \param[in] num_in number of keys to insert
296
* \param[in] stream CUDA stream in which this operation is executed in
297
* \param[in] probing_length maximum number of probing attempts
298
* \param[out] status_out status information per key
299
*/
300
template
<
class
StatusHandler
=
defaults
::
status_handler_t
>
301
HOSTQUALIFIER
INLINEQUALIFIER
302
void
insert
(
303
const
key_type
*
const
keys_in
,
304
const
value_type
*
const
values_in
,
305
const
index_type
num_in
,
306
const
cudaStream_t
stream
= 0,
307
const
index_type
probing_length
=
defaults
::
probing_length
(),
308
typename
StatusHandler
::
base_type
*
const
status_out
=
nullptr
)
noexcept
309
{
310
static_assert
(
311
checks
::
is_status_handler
<
StatusHandler
>(),
312
"not a valid status handler type"
);
313
314
if
(!
is_initialized_
)
return
;
315
316
kernels
::
insert
<
MultiValueHashTable
,
StatusHandler
>
317
<<<
SDIV
(
num_in
*
cg_size
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
, 0,
stream
>>>
318
(
keys_in
,
values_in
,
num_in
, *
this
,
probing_length
,
status_out
);
319
}
320
321
/*! \brief retrieves all values to a corresponding key
322
* \param[in] key_in key to retrieve from the hash table
323
* \param[out] values_out values for \c key_in
324
* \param[out] num_out number of retrieved values
325
* \param[in] group cooperative group
326
* \param[in] probing_length maximum number of probing attempts
327
* \return status (per thread)
328
*/
329
DEVICEQUALIFIER
INLINEQUALIFIER
330
status_type
retrieve
(
331
const
key_type
key_in
,
332
value_type
*
const
values_out
,
333
index_type
&
num_out
,
334
const
cg
::
thread_block_tile
<
cg_size
()>&
group
,
335
const
index_type
probing_length
=
defaults
::
probing_length
())
const
noexcept
336
{
337
if
(
values_out
==
nullptr
)
338
{
339
const
auto
status
=
num_values
(
key_in
,
num_out
,
group
,
probing_length
);
340
device_join_status
(
status_type
::
dry_run
());
341
return
status_type
::
dry_run
() +
status
;
342
}
343
else
344
{
345
return
for_each
([=, *
this
]
DEVICEQUALIFIER
346
(
const
key_type
/* key */
,
const
value_type
&
value
,
const
index_type
index
)
347
{
348
values_out
[
index
] =
value
;
349
},
350
key_in
,
351
num_out
,
352
group
,
353
probing_length
);
354
}
355
}
356
357
/*! \brief retrieve a set of keys from the hash table
358
* \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
359
* \note \c end_offsets_out can be \c begin_offsets_out+1
360
* \tparam StatusHandler handles returned status per key (see \c status_handlers)
361
* \param[in] keys_in pointer to keys to retrieve from the hash table
362
* \param[in] num_in number of keys to retrieve
363
* \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out
364
* \param[out] end_offsets_out end of value range for a corresponding key in \c values_out
365
* \param[out] num_out total number of values retrieved by this operation
366
* \param[in] stream CUDA stream in which this operation is executed in
367
* \param[in] probing_length maximum number of probing attempts
368
* \param[out] status_out status information (per key)
369
*/
370
template
<
class
StatusHandler
=
defaults
::
status_handler_t
>
371
HOSTQUALIFIER
INLINEQUALIFIER
372
void
retrieve
(
373
const
key_type
*
const
keys_in
,
374
const
index_type
num_in
,
375
index_type
*
const
begin_offsets_out
,
376
index_type
*
const
end_offsets_out
,
377
value_type
*
const
values_out
,
378
index_type
&
num_out
,
379
const
cudaStream_t
stream
= 0,
380
const
index_type
probing_length
=
defaults
::
probing_length
(),
381
typename
StatusHandler
::
base_type
*
const
status_out
=
nullptr
)
const
noexcept
382
{
383
static_assert
(
384
checks
::
is_status_handler
<
StatusHandler
>(),
385
"not a valid status handler type"
);
386
387
if
(!
is_initialized_
)
return
;
388
389
// cub::DeviceScan::InclusiveSum takes input sizes of type int
390
if
(
num_in
>
index_type
(
std
::
numeric_limits
<
int
>::
max
()))
391
{
392
join_status
(
status_type
::
index_overflow
(),
stream
);
393
394
return
;
395
}
396
397
num_values
(
398
keys_in
,
399
num_in
,
400
num_out
,
401
end_offsets_out
,
402
stream
,
403
probing_length
);
404
405
if
(
values_out
!=
nullptr
)
406
{
407
std
::
size_t
required_temp_bytes
= 0;
408
409
cub
::
DeviceScan
::
InclusiveSum
(
410
nullptr
,
411
required_temp_bytes
,
412
end_offsets_out
,
413
end_offsets_out
,
414
num_in
,
415
stream
);
416
417
cudaStreamSynchronize
(
stream
);
418
std
::
size_t
available_temp_bytes_from_outputbuffer
=
num_out
*
sizeof
(
value_type
);
419
420
if
(
available_temp_bytes_from_outputbuffer
>=
required_temp_bytes
)
421
{
422
423
cub
::
DeviceScan
::
InclusiveSum
(
424
values_out
,
425
available_temp_bytes_from_outputbuffer
,
426
end_offsets_out
,
427
end_offsets_out
,
428
num_in
,
429
stream
);
430
}
431
else
432
{
433
//slow path, need extra memory. cub caching allocator???
434
void
*
cubtemp
=
nullptr
;
435
cudaError_t
err
=
cudaMalloc
(&
cubtemp
,
required_temp_bytes
);
436
437
if
(
err
==
cudaSuccess
)
438
{
439
cub
::
DeviceScan
::
InclusiveSum
(
440
cubtemp
,
441
required_temp_bytes
,
442
end_offsets_out
,
443
end_offsets_out
,
444
num_in
,
445
stream
);
446
447
cudaFree
(
cubtemp
);
448
}
449
else
450
{
451
join_status
(
status_type
::
out_of_memory
(),
stream
);
452
num_out
= 0;
453
454
cudaFree
(
cubtemp
);
455
456
return
;
457
}
458
459
460
}
461
462
cudaMemsetAsync
(
begin_offsets_out
, 0,
sizeof
(
index_type
),
stream
);
463
464
if
(
end_offsets_out
!=
begin_offsets_out
+ 1)
465
{
466
cudaMemcpyAsync
(
467
begin_offsets_out
+ 1,
468
end_offsets_out
,
469
sizeof
(
index_type
) * (
num_in
- 1),
470
D2D
,
471
stream
);
472
}
473
474
kernels
::
retrieve
<
MultiValueHashTable
,
StatusHandler
>
475
<<<
SDIV
(
num_in
*
cg_size
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
, 0,
stream
>>>
476
(
477
keys_in
,
478
num_in
,
479
begin_offsets_out
,
480
end_offsets_out
,
481
values_out
,
482
*
this
,
483
probing_length
,
484
status_out
);
485
}
486
else
487
{
488
if
(
status_out
!=
nullptr
)
489
{
490
helpers
::
lambda_kernel
491
<<<
SDIV
(
num_in
,
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
, 0,
stream
>>>
492
([=, *
this
]
DEVICEQUALIFIER
493
{
494
const
index_type
tid
=
helpers
::
global_thread_id
();
495
496
if
(
tid
<
num_in
)
497
{
498
StatusHandler
::
handle
(
Status
::
dry_run
(),
status_out
,
tid
);
499
}
500
});
501
}
502
503
join_status
(
status_type
::
dry_run
(),
stream
);
504
}
505
506
if
(
stream
== 0)
507
{
508
cudaStreamSynchronize
(
stream
);
509
}
510
}
511
512
/*! \brief retrieves all elements from the hash table
513
* \note this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
514
* \note this method implements a multi-stage dry-run mode
515
* \param[out] keys_out pointer to the set of unique keys
516
* \param[out] num_keys_out number of unique keys
517
* \param[out] begin_offsets_out begin of value range for a corresponding key in \c values_out
518
* \param[out] end_offsets_out end of value range for a corresponding key in \c values_out
519
* \param[out] values_out array which holds all retrieved values
520
* \param[out] num_values_out total number of values retrieved by this operation
521
* \param[in] stream CUDA stream in which this operation is executed in
522
*/
523
HOSTQUALIFIER
INLINEQUALIFIER
524
void
retrieve_all
(
525
key_type
*
const
keys_out
,
526
index_type
&
num_keys_out
,
527
index_type
*
const
begin_offsets_out
,
528
index_type
*
const
end_offsets_out
,
529
value_type
*
const
values_out
,
530
value_type
&
num_values_out
,
531
const
cudaStream_t
stream
= 0)
const
noexcept
532
{
533
if
(!
is_initialized_
)
return
;
534
535
retrieve_all_keys
(
keys_out
,
num_keys_out
,
stream
);
536
537
if
(
keys_out
!=
nullptr
)
538
{
539
retrieve
(
540
keys_out
,
541
num_keys_out
,
542
begin_offsets_out
,
543
end_offsets_out
,
544
values_out
,
545
num_values_out
,
546
stream
);
547
}
548
549
if
(
stream
== 0)
550
{
551
cudaStreamSynchronize
(
stream
);
552
}
553
}
554
555
/*! \brief retrieve all unqiue keys
556
* \info this method has a dry-run mode where it only calculates the needed array sizes in case no memory (aka \c nullptr ) is provided
557
* \param[out] keys_out retrieved unqiue keys
558
* \param[out] num_out numof unique keys
559
* \param[in] stream CUDA stream in which this operation is executed in
560
*/
561
HOSTQUALIFIER
INLINEQUALIFIER
562
void
retrieve_all_keys
(
563
key_type
*
const
keys_out
,
564
index_type
&
num_out
,
565
const
cudaStream_t
stream
= 0)
const
noexcept
566
{
567
if
(!
is_initialized_
)
return
;
568
569
if
(
keys_out
!=
nullptr
)
570
{
571
index_type
*
const
tmp
=
temp_
.
get
();
572
cudaMemsetAsync
(
tmp
, 0,
sizeof
(
index_type
),
stream
);
573
574
kernels
::
for_each_unique_key
575
<<<
SDIV
(
capacity
()*
cg_size
(),
MAXBLOCKSIZE
),
MAXBLOCKSIZE
, 0,
stream
>>>
576
([=]
DEVICEQUALIFIER
(
const
key_type
&
key
)
577
{
578
index_type
out
=
helpers
::
atomicAggInc
(
tmp
);
579
keys_out
[
out
] =
key
;
580
}, *
this
);
581
582
cudaMemcpyAsync
(&
num_out
,
tmp
,
sizeof
(
index_type
),
D2H
,
stream
);
583
584
if
(
stream
== 0)
585
{
586
cudaStreamSynchronize
(
stream
);
587
}
588
}
589
else
590
{
591
num_out
=
num_keys
(
stream
);
592
join_status
(
status_type
::
dry_run
(),
stream
);
593
}
594
}
595
596
/*! \brief applies a funtion over all values of a specified key
597
* \tparam Func type of map i.e. CUDA device lambda
598
* \param[in] f map to apply
599
* \param[in] key_in key to consider
600
* \param[out] num_values_out number of values associated to \c key_in
601
* \param[in] group cooperative group
602
* \param[in] probing_length maximum number of probing attempts
603
* \return status (per thread)
604
*/
605
template
<
class
Func
>
606
DEVICEQUALIFIER
INLINEQUALIFIER
607
status_type
for_each
(
608
Func
f
,
609
const
key_type
key_in
,
610
index_type
&
num_values_out
,
611
const
cg
::
thread_block_tile
<
cg_size
()>&
group
,
612
const
index_type
probing_length
=
defaults
::
probing_length
())
const
noexcept
613
{
614
if
(!
is_initialized_
)
return
status_type
::
not_initialized
();
615
616
if
(!
is_valid_key
(
key_in
))
617
{
618
num_values_out
= 0;
619
device_join_status
(
status_type
::
invalid_key
());
620
return
status_type
::
invalid_key
();
621
}
622
623
ProbingScheme
iter
(
capacity
(),
min
(
probing_length
,
capacity
()),
group
);
624
625
index_type
num
= 0;
626
for
(
index_type
i
=
iter
.
begin
(
key_in
,
seed_
);
i
!=
iter
.
end
();
i
=
iter
.
next
())
627
{
628
const
auto
table_key
=
table_
[
i
].
key
;
629
const
auto
hit
= (
table_key
==
key_in
);
630
const
auto
hit_mask
=
group
.
ballot
(
hit
);
631
632
if
(
hit
)
633
{
634
const
auto
j
=
635
num
+
__popc
(
hit_mask
& ((1UL <<
group
.
thread_rank
()) - 1));
636
637
f
(
key_in
,
table_
[
i
].
value
,
j
);
638
}
639
640
num
+=
__popc
(
hit_mask
);
641
642
if
(
group
.
any
(
is_empty_key
(
table_key
) ||
num
>=
max_values_per_key_
))
643
{
644
num_values_out
=
num
;
645
646
if
(
num
== 0)
647
{
648
device_join_status
(
status_type
::
key_not_found
());
649
return
status_type
::
key_not_found
();
650
}
651
else
652
{
653
return
status_type
::
none
();
654
}
655
}
656
}
657
658
num_values_out
=
num
;
659
device_join_status
(
status_type
::
probing_length_exceeded
());
660
return
status_type
::
probing_length_exceeded
();
661
}
662
663
/*! \brief applies a funtion over all key value pairs inside the table
664
* \tparam Func type of map i.e. CUDA device lambda
665
* \param[in] f map to apply
666
* \param[in] stream CUDA stream in which this operation is executed in
667
* \param[in] size of dynamic shared memory to reserve for this execution
668
*/
669
template
<
class
Func
>
670
HOSTQUALIFIER
INLINEQUALIFIER
671
void
for_each
(
672
Func
f
,
// TODO const?
673
const
cudaStream_t
stream
= 0,
674
const
index_type
smem_bytes
= 0)
const
noexcept
675
{
676
if
(!
is_initialized_
)
return
;
677
678
kernels
::
for_each
<
Func
,
MultiValueHashTable
>
679
<<<
SDIV
(
capacity
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
,
smem_bytes
,
stream
>>>
680
(
f
, *
this
);
681
}
682
683
/*! \brief applies a funtion over all key value pairs
684
* \tparam Func type of map i.e. CUDA device lambda
685
* \tparam StatusHandler handles returned status per key (see \c status_handlers)
686
* \param[in] f map to apply
687
* \param[in] keys_in keys to consider
688
* \param[in] num_in number of keys
689
* \param[in] stream CUDA stream in which this operation is executed in
690
* \param[in] probing_length maximum number of probing attempts
691
* \param[out] status_out status information (per key)
692
* \param[in] size of dynamic shared memory to reserve for this execution
693
*/
694
template
<
class
Func
,
class
StatusHandler
=
defaults
::
status_handler_t
>
695
HOSTQUALIFIER
INLINEQUALIFIER
696
void
for_each
(
697
Func
f
,
// TODO const?
698
const
key_type
*
const
keys_in
,
699
const
index_type
num_in
,
700
const
cudaStream_t
stream
= 0,
701
const
index_type
probing_length
=
defaults
::
probing_length
(),
702
typename
StatusHandler
::
base_type
*
const
status_out
=
nullptr
,
703
const
index_type
smem_bytes
= 0)
const
noexcept
704
{
705
static_assert
(
706
checks
::
is_status_handler
<
StatusHandler
>(),
707
"not a valid status handler type"
);
708
709
if
(!
is_initialized_
)
return
;
710
711
kernels
::
for_each
<
Func
,
MultiValueHashTable
>
712
<<<
SDIV
(
capacity
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
,
smem_bytes
,
stream
>>>
713
(
f
,
keys_in
,
num_in
, *
this
,
status_out
);
714
}
715
716
/*! \brief number of unique keys inside the table
717
* \param[in] stream CUDA stream in which this operation is executed in
718
* \return number of unique keys
719
*/
720
HOSTQUALIFIER
INLINEQUALIFIER
721
index_type
num_keys
(
const
cudaStream_t
stream
= 0)
const
noexcept
722
{
723
index_type
num
= 0;
724
725
cudaMemcpyAsync
(&
num
,
num_keys_
,
sizeof
(
index_type
),
D2H
,
stream
);
726
727
cudaStreamSynchronize
(
stream
);
728
729
return
num
;
730
}
731
732
/*! \brief total number of values inside the table
733
* \param[in] key_in key to be probed
734
* \param[out] num_out number of values associated to \c key_in*
735
* \param[in] group cooperative group
736
* \param[in] probing_length maximum number of probing attempts
737
* \return status (per thread)
738
*/
739
DEVICEQUALIFIER
INLINEQUALIFIER
740
status_type
num_values
(
741
const
key_type
key_in
,
742
index_type
&
num_out
,
743
const
cg
::
thread_block_tile
<
cg_size
()>&
group
,
744
const
index_type
probing_length
=
defaults
::
probing_length
())
const
noexcept
745
{
746
return
for_each
([=]
DEVICEQUALIFIER
(
747
const
key_type
/* key */
,
748
const
value_type
&
/* value */
,
749
const
index_type
/* index */
) {},
750
key_in
,
751
num_out
,
752
group
,
753
probing_length
);
754
}
755
756
/*! \brief number of values associated to a set of keys
757
* \info this function returns only \c num_out if \c num_per_key_out==nullptr
758
* \tparam StatusHandler handles returned status per key (see \c status_handlers)
759
* \param[in] keys_in keys to consider
760
* \param[in] num_in number of keys
761
* \param[out] num_out total number of values
762
* \param[out] num_per_key_out number of values per key
763
* \param[in] stream CUDA stream in which this operation is executed in
764
* \param[in] probing_length maximum number of probing attempts
765
* \param[out] status_out status information (per key)
766
*/
767
template
<
class
StatusHandler
=
defaults
::
status_handler_t
>
768
HOSTQUALIFIER
INLINEQUALIFIER
769
void
num_values
(
770
const
key_type
*
const
keys_in
,
771
const
index_type
num_in
,
772
index_type
&
num_out
,
773
index_type
*
const
num_per_key_out
=
nullptr
,
774
const
cudaStream_t
stream
= 0,
775
const
index_type
probing_length
=
defaults
::
probing_length
(),
776
typename
StatusHandler
::
base_type
*
const
status_out
=
nullptr
)
const
noexcept
777
{
778
if
(!
is_initialized_
)
return
;
779
780
// TODO check if shared memory is benefitial
781
782
index_type
*
const
tmp
=
temp_
.
get
();
783
cudaMemsetAsync
(
tmp
, 0,
sizeof
(
index_type
),
stream
);
784
785
kernels
::
num_values
<
MultiValueHashTable
,
StatusHandler
>
786
<<<
SDIV
(
num_in
*
cg_size
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
, 0,
stream
>>>
787
(
keys_in
,
num_in
,
tmp
,
num_per_key_out
, *
this
,
probing_length
,
status_out
);
788
789
cudaMemcpyAsync
(&
num_out
,
tmp
,
sizeof
(
index_type
),
D2H
,
stream
);
790
791
if
(
stream
== 0)
792
{
793
cudaStreamSynchronize
(
stream
);
794
}
795
}
796
797
/*! \brief number of values stored inside the hash table
798
* \info alias for \c size()
799
* \param[in] stream CUDA stream in which this operation is executed in
800
* \return the number of values
801
*/
802
HOSTQUALIFIER
INLINEQUALIFIER
803
index_type
num_values
(
const
cudaStream_t
stream
= 0)
const
noexcept
804
{
805
return
size
(
stream
);
806
}
807
808
/*! \brief number of values stored inside the hash table
809
* \param[in] stream CUDA stream in which this operation is executed in
810
* \return the number of values
811
*/
812
HOSTQUALIFIER
INLINEQUALIFIER
813
index_type
size
(
const
cudaStream_t
stream
= 0)
const
noexcept
814
{
815
if
(!
is_initialized_
)
return
0;
816
817
index_type
out
;
818
index_type
*
tmp
=
temp_
.
get
();
819
820
cudaMemsetAsync
(
tmp
, 0,
sizeof
(
index_t
),
stream
);
821
822
kernels
::
size
823
<<<
SDIV
(
capacity
(),
WARPCORE_BLOCKSIZE
),
WARPCORE_BLOCKSIZE
, 0,
stream
>>>
824
(
tmp
, *
this
);
825
826
cudaMemcpyAsync
(
827
&
out
,
828
tmp
,
829
sizeof
(
index_type
),
830
D2H
,
831
stream
);
832
833
cudaStreamSynchronize
(
stream
);
834
835
return
out
;
836
}
837
838
/*! \brief current load factor of the hash table
839
* \param[in] stream CUDA stream in which this operation is executed in
840
* \return load factor
841
*/
842
HOSTQUALIFIER
INLINEQUALIFIER
843
float
load_factor
(
const
cudaStream_t
stream
= 0)
const
noexcept
844
{
845
return
float
(
size
(
stream
)) /
float
(
capacity
());
846
}
847
848
/*! \brief current storage density of the hash table
849
* \param[in] stream CUDA stream in which this operation is executed in
850
* \return storage density
851
*/
852
HOSTQUALIFIER
INLINEQUALIFIER
853
float
storage_density
(
const
cudaStream_t
stream
= 0)
const
noexcept
854
{
855
const
index_type
key_bytes
=
num_keys
(
stream
) *
sizeof
(
key_type
);
856
const
index_type
value_bytes
=
num_values
(
stream
) *
sizeof
(
value_type
);
857
const
index_type
table_bytes
=
table_
.
bytes_total
();
858
return
float
(
key_bytes
+
value_bytes
) /
float
(
table_bytes
);
859
}
860
861
/*! \brief get the capacity of the hash table
862
* \return number of slots in the hash table
863
*/
864
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
865
index_type
capacity
()
const
noexcept
866
{
867
return
table_
.
capacity
();
868
}
869
870
/*! \brief indicates if the hash table is properly initialized
871
* \return \c true iff the hash table is properly initialized
872
*/
873
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
874
bool
is_initialized
()
const
noexcept
875
{
876
return
is_initialized_
;
877
}
878
879
/*! \brief get the status of the hash table
880
* \param[in] stream CUDA stream in which this operation is executed in
881
* \return the status
882
*/
883
HOSTQUALIFIER
INLINEQUALIFIER
884
status_type
peek_status
(
const
cudaStream_t
stream
= 0)
const
noexcept
885
{
886
status_type
status
=
status_type
::
not_initialized
();
887
888
if
(
status_
!=
nullptr
)
889
{
890
cudaMemcpyAsync
(
891
&
status
,
892
status_
,
893
sizeof
(
status_type
),
894
D2H
,
895
stream
);
896
897
cudaStreamSynchronize
(
stream
);
898
}
899
900
return
status
;
901
}
902
903
/*! \brief get and reset the status of the hash table
904
* \param[in] stream CUDA stream in which this operation is executed in
905
* \return the status
906
*/
907
HOSTQUALIFIER
INLINEQUALIFIER
908
status_type
pop_status
(
const
cudaStream_t
stream
= 0)
noexcept
909
{
910
status_type
status
=
status_type
::
not_initialized
();
911
912
if
(
status_
!=
nullptr
)
913
{
914
cudaMemcpyAsync
(
915
&
status
,
916
status_
,
917
sizeof
(
status_type
),
918
D2H
,
919
stream
);
920
921
assign_status
(
table_
.
status
(),
stream
);
922
}
923
924
return
status
;
925
}
926
927
/*! \brief checks if \c key is equal to \c EmptyKey
928
* \return \c bool
929
*/
930
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
931
static
constexpr
bool
is_empty_key
(
const
key_type
key
)
noexcept
932
{
933
return
(
key
==
empty_key
());
934
}
935
936
/*! \brief checks if \c key is equal to \c TombstoneKey
937
* \return \c bool
938
*/
939
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
940
static
constexpr
bool
is_tombstone_key
(
const
key_type
key
)
noexcept
941
{
942
return
(
key
==
tombstone_key
());
943
}
944
945
/*! \brief checks if \c key is equal to \c (EmptyKey||TombstoneKey)
946
* \return \c bool
947
*/
948
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
949
static
constexpr
bool
is_valid_key
(
const
key_type
key
)
noexcept
950
{
951
return
(
key
!=
empty_key
() &&
key
!=
tombstone_key
());
952
}
953
954
/*! \brief indicates if this object is a shallow copy
955
* \return \c bool
956
*/
957
HOSTDEVICEQUALIFIER
INLINEQUALIFIER
958
bool
is_copy
()
const
noexcept
959
{
960
return
is_copy_
;
961
}
962
963
private
:
964
/*! \brief assigns the hash table's status
965
* \info \c const on purpose
966
* \param[in] status new status
967
* \param[in] stream CUDA stream in which this operation is executed in
968
*/
969
HOSTQUALIFIER
INLINEQUALIFIER
970
void
assign_status
(
971
const
status_type
status
,
972
const
cudaStream_t
stream
= 0)
const
noexcept
973
{
974
if
(
status_
!=
nullptr
)
975
{
976
cudaMemcpyAsync
(
977
status_
,
978
&
status
,
979
sizeof
(
status_type
),
980
H2D
,
981
stream
);
982
983
cudaStreamSynchronize
(
stream
);
984
}
985
}
986
987
/*! \brief joins additional flags to the hash table's status
988
* \info \c const on purpose
989
* \param[in] status new status
990
* \param[in] stream CUDA stream in which this operation is executed in
991
*/
992
HOSTQUALIFIER
INLINEQUALIFIER
993
void
join_status
(
994
const
status_type
status
,
995
const
cudaStream_t
stream
= 0)
const
noexcept
996
{
997
if
(
status_
!=
nullptr
)
998
{
999
status_type
peeked
=
peek_status
(
stream
);
1000
const
status_type
joined
=
peeked
+
status
;
1001
1002
if
(
joined
!=
peeked
)
1003
{
1004
assign_status
(
joined
,
stream
);
1005
}
1006
}
1007
}
1008
1009
/*! \brief joins additional flags to the hash table's status
1010
* \info \c const on purpose
1011
* \param[in] status new status
1012
*/
1013
DEVICEQUALIFIER
INLINEQUALIFIER
1014
void
device_join_status
(
const
status_type
status
)
const
noexcept
1015
{
1016
if
(
status_
!=
nullptr
)
1017
{
1018
status_
->
atomic_join
(
status
);
1019
}
1020
}
1021
1022
status_type
*
status_
;
//< pointer to status
1023
TableStorage
table_
;
//< actual key/value storage
1024
temp_type
temp_
;
//< temporary memory
1025
key_type
seed_
;
//< random seed
1026
index_type
max_values_per_key_
;
//< maximum number of values to store per key
1027
index_type
*
num_keys_
;
//< pointer to the count of unique keys
1028
bool
is_copy_
;
//< indicates if table is a shallow copy
1029
bool
is_initialized_
;
//< indicates if table is properly initialized
1030
1031
template
<
class
Core
>
1032
GLOBALQUALIFIER
1033
friend
void
kernels
::
size
(
index_type
*
const
,
const
Core
);
1034
1035
template
<
class
Func
,
class
Core
>
1036
GLOBALQUALIFIER
1037
friend
void
kernels
::
for_each
(
Func
,
const
Core
);
1038
1039
template
<
class
Func
,
class
Core
>
1040
GLOBALQUALIFIER
1041
friend
void
kernels
::
for_each_unique_key
(
Func
,
const
Core
);
1042
1043
template
<
class
Core
,
class
StatusHandler
>
1044
GLOBALQUALIFIER
1045
friend
void
kernels
::
retrieve
(
1046
const
typename
Core
::
key_type
*
const
,
1047
const
index_type
,
1048
const
index_type
*
const
,
1049
const
index_type
*
const
,
1050
typename
Core
::
value_type
*
const
,
1051
const
Core
,
1052
const
index_type
,
1053
typename
StatusHandler
::
base_type
*
const
);
1054
1055
};
// class MultiValueHashTable
1056
1057
}
// namespace warpcore
1058
1059
#
endif
/* WARPCORE_MULTI_VALUE_HASH_TABLE_CUH */
include
warpcore
multi_value_hash_table.cuh
Generated by
1.9.1