chore(gpu): use active streams in int_radix_lut

This commit is contained in:
Andrei Stoian
2025-09-11 15:40:36 +02:00
parent 1513c3bc8c
commit 3be7aae8f3
6 changed files with 456 additions and 609 deletions

View File

@@ -183,4 +183,93 @@ public:
}
};
struct CudaStreamsBarrier {
private:
std::vector<cudaEvent_t> _events;
CudaStreams _streams;
CudaStreamsBarrier(const CudaStreamsBarrier &) {} // Prevent copy-construction
CudaStreamsBarrier &operator=(const CudaStreamsBarrier &) {
return *this;
} // Prevent assignment
public:
void create_on(const CudaStreams &streams) {
_streams = streams;
GPU_ASSERT(streams.count() > 1, "CudaStreamsFirstWaitsWorkersBarrier: "
"Attempted to create on single GPU");
_events.resize(streams.count());
for (int i = 0; i < streams.count(); i++) {
_events[i] = cuda_create_event(streams.gpu_index(i));
}
}
CudaStreamsBarrier(){};
void local_streams_wait_for_stream_0(const CudaStreams &user_streams) {
GPU_ASSERT(!_events.empty(),
"CudaStreamsBarrier: must call create_on before use");
GPU_ASSERT(user_streams.gpu_index(0) == _streams.gpu_index(0),
"CudaStreamsBarrier: synchronization can only be performed on "
"the GPUs the barrier was initially created on.");
cuda_event_record(_events[0], user_streams.stream(0),
user_streams.gpu_index(0));
for (int j = 1; j < user_streams.count(); j++) {
GPU_ASSERT(user_streams.gpu_index(j) == _streams.gpu_index(j),
"CudaStreamsBarrier: synchronization can only be performed on "
"the GPUs the barrier was initially created on.");
cuda_stream_wait_event(user_streams.stream(j), _events[0],
user_streams.gpu_index(j));
}
}
void stream_0_wait_for_local_streams(const CudaStreams &user_streams) {
GPU_ASSERT(
!_events.empty(),
"CudaStreamsFirstWaitsWorkersBarrier: must call create_on before use");
GPU_ASSERT(
user_streams.count() <= _events.size(),
"CudaStreamsFirstWaitsWorkersBarrier: trying to synchronize too many "
"streams. "
"The barrier was created on a LUT that had %lu active streams, while "
"the user stream set has %u streams",
_events.size(), user_streams.count());
if (user_streams.count() > 1) {
// Worker GPUs record their events
for (int j = 1; j < user_streams.count(); j++) {
GPU_ASSERT(_streams.gpu_index(j) == user_streams.gpu_index(j),
"CudaStreamsBarrier: The user stream "
"set GPU[%d]=%u while the LUT stream set GPU[%d]=%u",
j, user_streams.gpu_index(j), j, _streams.gpu_index(j));
cuda_event_record(_events[j], user_streams.stream(j),
user_streams.gpu_index(j));
}
// GPU 0 waits for all workers
for (int j = 1; j < user_streams.count(); j++) {
cuda_stream_wait_event(user_streams.stream(0), _events[j],
user_streams.gpu_index(0));
}
}
}
void release() {
for (int j = 0; j < _streams.count(); j++) {
cuda_event_destroy(_events[j], _streams.gpu_index(j));
}
_events.clear();
}
~CudaStreamsBarrier() {
GPU_ASSERT(_events.empty(),
"CudaStreamsBarrier: must "
"call release before destruction: events size = %lu",
_events.size());
}
};
#endif

View File

@@ -266,6 +266,11 @@ void cuda_memcpy_with_size_tracking_async_gpu_to_gpu(
uint32_t gpu_index, bool gpu_memory_allocated) {
if (size == 0 || !gpu_memory_allocated)
return;
GPU_ASSERT(dest != nullptr,
"Cuda error: trying to copy gpu->gpu to null ptr");
GPU_ASSERT(src != nullptr,
"Cuda error: trying to copy gpu->gpu from null ptr");
cudaPointerAttributes attr_dest;
check_cuda_error(cudaPointerGetAttributes(&attr_dest, dest));
PANIC_IF_FALSE(

View File

@@ -359,13 +359,9 @@ host_integer_decompress(CudaStreams streams,
std::vector<Torus *> lwe_trivial_indexes_vec =
lut->lwe_trivial_indexes_vec;
/// Make sure all data that should be on GPU 0 is indeed there
cuda_event_record(lut->event_scatter_in, streams.stream(0),
streams.gpu_index(0));
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(j), lut->event_scatter_in,
streams.gpu_index(j));
}
lut->multi_gpu_scatter_barrier.local_streams_wait_for_stream_0(
active_streams);
/// With multiple GPUs we push to the vectors on each GPU then when we
/// gather data to GPU 0 we can copy back to the original indexing
multi_gpu_scatter_lwe_async<Torus>(
@@ -395,15 +391,8 @@ host_integer_decompress(CudaStreams streams,
/// Synchronize all GPUs
// other gpus record their events
for (int j = 1; j < active_streams.count(); j++) {
cuda_event_record(lut->event_scatter_out[j], active_streams.stream(j),
active_streams.gpu_index(j));
}
// GPU 0 waits for all
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(0), lut->event_scatter_out[j],
streams.gpu_index(0));
}
lut->multi_gpu_gather_barrier.stream_0_wait_for_local_streams(
active_streams);
}
} else {
static_assert(std::is_same_v<Torus, __uint128_t>,

View File

@@ -560,12 +560,8 @@ __host__ void integer_radix_apply_univariate_lookup_table_kb(
grouping_factor, num_radix_blocks, pbs_type, num_many_lut, lut_stride);
} else {
/// Make sure all data that should be on GPU 0 is indeed there
cuda_event_record(lut->event_scatter_in, streams.stream(0),
streams.gpu_index(0));
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(j), lut->event_scatter_in,
streams.gpu_index(j));
}
lut->multi_gpu_scatter_barrier.local_streams_wait_for_stream_0(
active_streams);
/// With multiple GPUs we push to the vectors on each GPU then when we
/// gather data to GPU 0 we can copy back to the original indexing
@@ -598,16 +594,8 @@ __host__ void integer_radix_apply_univariate_lookup_table_kb(
lut->lwe_indexes_out, lut->using_trivial_lwe_indexes,
lut->lwe_aligned_vec, num_radix_blocks, big_lwe_dimension + 1);
POP_RANGE()
// other gpus record their events
for (int j = 1; j < active_streams.count(); j++) {
cuda_event_record(lut->event_scatter_out[j], streams.stream(j),
streams.gpu_index(j));
}
// GPU 0 waits for all
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(0), lut->event_scatter_out[j],
streams.gpu_index(0));
}
lut->multi_gpu_gather_barrier.stream_0_wait_for_local_streams(
active_streams);
}
for (uint i = 0; i < num_radix_blocks; i++) {
auto degrees_index = lut->h_lut_indexes[i];
@@ -674,12 +662,9 @@ __host__ void integer_radix_apply_many_univariate_lookup_table_kb(
grouping_factor, num_radix_blocks, pbs_type, num_many_lut, lut_stride);
} else {
/// Make sure all data that should be on GPU 0 is indeed there
cuda_event_record(lut->event_scatter_in, streams.stream(0),
streams.gpu_index(0));
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(j), lut->event_scatter_in,
streams.gpu_index(j));
}
lut->multi_gpu_scatter_barrier.local_streams_wait_for_stream_0(
active_streams);
/// With multiple GPUs we push to the vectors on each GPU then when we
/// gather data to GPU 0 we can copy back to the original indexing
PUSH_RANGE("scatter")
@@ -712,16 +697,8 @@ __host__ void integer_radix_apply_many_univariate_lookup_table_kb(
num_radix_blocks, big_lwe_dimension + 1, num_many_lut);
POP_RANGE()
// other gpus record their events
for (int j = 1; j < active_streams.count(); j++) {
cuda_event_record(lut->event_scatter_out[j], streams.stream(j),
streams.gpu_index(j));
}
// GPU 0 waits for all
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(0), lut->event_scatter_out[j],
streams.gpu_index(0));
}
lut->multi_gpu_gather_barrier.stream_0_wait_for_local_streams(
active_streams);
}
for (uint i = 0; i < lwe_array_out->num_radix_blocks; i++) {
auto degrees_index = lut->h_lut_indexes[i % lut->num_blocks];
@@ -802,12 +779,9 @@ __host__ void integer_radix_apply_bivariate_lookup_table_kb(
small_lwe_dimension, polynomial_size, pbs_base_log, pbs_level,
grouping_factor, num_radix_blocks, pbs_type, num_many_lut, lut_stride);
} else {
cuda_event_record(lut->event_scatter_in, streams.stream(0),
streams.gpu_index(0));
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(j), lut->event_scatter_in,
streams.gpu_index(j));
}
lut->multi_gpu_scatter_barrier.local_streams_wait_for_stream_0(
active_streams);
PUSH_RANGE("scatter")
multi_gpu_scatter_lwe_async<Torus>(
active_streams, lwe_array_in_vec, (Torus *)lwe_array_pbs_in->ptr,
@@ -837,16 +811,8 @@ __host__ void integer_radix_apply_bivariate_lookup_table_kb(
lut->lwe_indexes_out, lut->using_trivial_lwe_indexes,
lut->lwe_aligned_vec, num_radix_blocks, big_lwe_dimension + 1);
POP_RANGE()
// other gpus record their events
for (int j = 1; j < active_streams.count(); j++) {
cuda_event_record(lut->event_scatter_out[j], streams.stream(j),
streams.gpu_index(j));
}
// GPU 0 waits for all
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(0), lut->event_scatter_out[j],
streams.gpu_index(0));
}
lut->multi_gpu_gather_barrier.stream_0_wait_for_local_streams(
active_streams);
}
for (uint i = 0; i < num_radix_blocks; i++) {
auto degrees_index = lut->h_lut_indexes[i];
@@ -2399,7 +2365,7 @@ __host__ void integer_radix_apply_noise_squashing_kb(
execute_pbs_async<uint64_t, __uint128_t>(
streams.get_ith(0), (__uint128_t *)lwe_array_out->ptr,
lwe_trivial_indexes_vec[0], lut->lut_vec, lwe_trivial_indexes_vec,
lwe_after_ks_vec[0], lwe_trivial_indexes_vec[0], bsks, lut->pbs_buffer,
lwe_after_ks_vec[0], lwe_trivial_indexes_vec[0], bsks, lut->buffer,
glwe_dimension, small_lwe_dimension, polynomial_size, pbs_base_log,
pbs_level, grouping_factor, lwe_array_out->num_radix_blocks,
params.pbs_type, 0, 0);
@@ -2426,7 +2392,7 @@ __host__ void integer_radix_apply_noise_squashing_kb(
execute_pbs_async<uint64_t, __uint128_t>(
active_streams, lwe_after_pbs_vec, lwe_trivial_indexes_vec,
lut->lut_vec, lwe_trivial_indexes_vec, lwe_after_ks_vec,
lwe_trivial_indexes_vec, bsks, lut->pbs_buffer, glwe_dimension,
lwe_trivial_indexes_vec, bsks, lut->buffer, glwe_dimension,
small_lwe_dimension, polynomial_size, pbs_base_log, pbs_level,
grouping_factor, lwe_array_out->num_radix_blocks, params.pbs_type, 0,
0);

View File

@@ -45,12 +45,8 @@ void host_integer_grouped_oprf(CudaStreams streams,
std::vector<Torus *> lwe_after_pbs_vec = lut->lwe_after_pbs_vec;
std::vector<Torus *> lwe_trivial_indexes_vec = lut->lwe_trivial_indexes_vec;
cuda_event_record(lut->event_scatter_in, streams.stream(0),
streams.gpu_index(0));
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(j), lut->event_scatter_in,
streams.gpu_index(j));
}
lut->multi_gpu_scatter_barrier.local_streams_wait_for_stream_0(
active_streams);
PUSH_RANGE("scatter")
multi_gpu_scatter_lwe_async<Torus>(
@@ -76,16 +72,8 @@ void host_integer_grouped_oprf(CudaStreams streams,
lut->lwe_aligned_vec, num_blocks_to_process,
mem_ptr->params.big_lwe_dimension + 1);
POP_RANGE()
// other gpus record their events
for (int j = 1; j < active_streams.count(); j++) {
cuda_event_record(lut->event_scatter_out[j], streams.stream(j),
streams.gpu_index(j));
}
// GPU 0 waits for all
for (int j = 1; j < active_streams.count(); j++) {
cuda_stream_wait_event(streams.stream(0), lut->event_scatter_out[j],
streams.gpu_index(0));
}
lut->multi_gpu_gather_barrier.stream_0_wait_for_local_streams(
active_streams);
}
for (uint32_t i = 0; i < num_blocks_to_process; i++) {