diff --git a/compilers/concrete-compiler/compiler/include/concretelang/Runtime/stream_emulator_api.h b/compilers/concrete-compiler/compiler/include/concretelang/Runtime/stream_emulator_api.h index 04ad646e8..2f6f4ede0 100644 --- a/compilers/concrete-compiler/compiler/include/concretelang/Runtime/stream_emulator_api.h +++ b/compilers/concrete-compiler/compiler/include/concretelang/Runtime/stream_emulator_api.h @@ -36,12 +36,12 @@ void stream_emulator_make_memref_negate_lwe_ciphertext_u64_process(void *dfg, void *sout); void stream_emulator_make_memref_keyswitch_lwe_u64_process( void *dfg, void *sin1, void *sout, uint32_t level, uint32_t base_log, - uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t ksk_index, - uint32_t output_size, void *context); + uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t output_size, + uint32_t ksk_index, void *context); void stream_emulator_make_memref_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context); + uint32_t output_size, uint32_t bsk_index, void *context); void stream_emulator_make_memref_batched_add_lwe_ciphertexts_u64_process( void *dfg, void *sin1, void *sin2, void *sout); @@ -57,16 +57,16 @@ void stream_emulator_make_memref_batched_negate_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sout); void stream_emulator_make_memref_batched_keyswitch_lwe_u64_process( void *dfg, void *sin1, void *sout, uint32_t level, uint32_t base_log, - uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t ksk_index, - uint32_t output_size, void *context); + uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t output_size, + uint32_t ksk_index, void *context); void stream_emulator_make_memref_batched_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context); + uint32_t output_size, uint32_t bsk_index, void *context); void stream_emulator_make_memref_batched_mapped_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context); + uint32_t output_size, uint32_t bsk_index, void *context); void *stream_emulator_make_uint64_stream(const char *name, stream_type stype); void stream_emulator_put_uint64(void *stream, uint64_t e); diff --git a/compilers/concrete-compiler/compiler/lib/Runtime/CMakeLists.txt b/compilers/concrete-compiler/compiler/lib/Runtime/CMakeLists.txt index db8bb7715..5bdd7e403 100644 --- a/compilers/concrete-compiler/compiler/lib/Runtime/CMakeLists.txt +++ b/compilers/concrete-compiler/compiler/lib/Runtime/CMakeLists.txt @@ -1,5 +1,6 @@ if(CONCRETELANG_CUDA_SUPPORT) add_library(ConcretelangRuntime SHARED context.cpp wrappers.cpp DFRuntime.cpp GPUDFG.cpp) + target_link_libraries(ConcretelangRuntime PRIVATE hwloc) else() add_library(ConcretelangRuntime SHARED context.cpp wrappers.cpp DFRuntime.cpp StreamEmulator.cpp) endif() diff --git a/compilers/concrete-compiler/compiler/lib/Runtime/GPUDFG.cpp b/compilers/concrete-compiler/compiler/lib/Runtime/GPUDFG.cpp index 835477b07..ba5b0e03a 100644 --- a/compilers/concrete-compiler/compiler/lib/Runtime/GPUDFG.cpp +++ b/compilers/concrete-compiler/compiler/lib/Runtime/GPUDFG.cpp @@ -5,6 +5,8 @@ #include #include +#include +#include #include #include #include @@ -32,13 +34,29 @@ namespace concretelang { namespace gpu_dfg { namespace { +// When not using all accelerators on the machine, we distribute work +// by assigning the default accelerator for each SDFG to next +// round-robin. static std::atomic next_device = {0}; -static size_t num_devices = 0; +// Resources available (or set as requested by user through +// environment variables) on the machine. Defaults to using all +// available. +static size_t num_devices = 0; // Set SDFG_NUM_GPUS to configure +static size_t num_cores = 1; // Set OMP_NUM_THREADS to configure (as + // this is linked to loop parallelism) + +// By default we distribute batched ops across all available GPUs +// (or value of environment variable SDFG_NUM_GPUS whichever is +// lower). Set SDFG_DISTRIBUTE_BATCH_OPS=OFF to inhibit this. +static bool dont_distribute_batched_ops = false; + +// Get the byte size of a rank 2 MemRef static inline size_t memref_get_data_size(MemRef2 &m) { return m.sizes[0] * m.sizes[1] * sizeof(uint64_t); } +// Copy contiguous rank 2 MemRef 'in' to 'out' static inline void memref_copy_contiguous(MemRef2 &out, MemRef2 &in) { assert(in.sizes[0] == out.sizes[0] && in.sizes[1] == out.sizes[1] && "memref_copy_contiguous sizes differ"); @@ -51,6 +69,7 @@ static inline void memref_copy_contiguous(MemRef2 &out, MemRef2 &in) { memref_get_data_size(in)); } +// Copy contiguous rank 2 MemRef to a newly allocated, returned MemRef static inline MemRef2 memref_copy_alloc(MemRef2 &m) { uint64_t *data = (uint64_t *)malloc(memref_get_data_size(m)); MemRef2 ret = { @@ -59,6 +78,7 @@ static inline MemRef2 memref_copy_alloc(MemRef2 &m) { return ret; } +// Parameter storage for KS and BS processes struct Void {}; union Param { Void _; @@ -68,9 +88,22 @@ union Context { Void _; RuntimeContext *val; }; +// Tracking locations and state for dependences: location is either an +// integer >= 0 corresponding to the device/accelerator index on the +// machine or -1/-2 when it is only available on the host or is split +// across multiple locations. static const int32_t host_location = -1; +static const int32_t split_location = -2; +// Similarly dependence chunks are either indexed (which does not +// always correlate to the device index on which they are located) or +// this dependence is split further. +static const int32_t single_chunk = -1; +static const int32_t split_chunks = -2; struct Stream; struct Dependence; +// Track buffer/scratchpad for the PBS to avoid re-allocating it on +// the device. Reuse where possible or reallocate if a larger buffer +// is required. struct PBS_buffer { PBS_buffer(void *stream, uint32_t gpu_idx, uint32_t glwe_dimension, uint32_t polynomial_size, uint32_t input_lwe_ciphertext_count) @@ -103,17 +136,54 @@ private: void *gpu_stream; uint32_t gpu_index; }; + +// Keep track of the GPU/CUDA streams used for each accelerator and +// associated PBS buffer. +struct GPU_state { + uint32_t gpu_idx; + void *gpu_stream; + PBS_buffer *pbs_buffer; + GPU_state(uint32_t idx) + : gpu_idx(idx), gpu_stream(nullptr), pbs_buffer(nullptr) {} + ~GPU_state() { + if (pbs_buffer != nullptr) + delete pbs_buffer; + if (gpu_stream != nullptr) + cuda_destroy_stream((cudaStream_t *)gpu_stream, gpu_idx); + } + inline int8_t *get_pbs_buffer(uint32_t glwe_dimension, + uint32_t polynomial_size, + uint32_t input_lwe_ciphertext_count) { + if (pbs_buffer == nullptr) + pbs_buffer = new PBS_buffer(get_gpu_stream(), gpu_idx, glwe_dimension, + polynomial_size, input_lwe_ciphertext_count); + return pbs_buffer->get_pbs_buffer(get_gpu_stream(), gpu_idx, glwe_dimension, + polynomial_size, + input_lwe_ciphertext_count); + } + inline void *get_gpu_stream() { + if (gpu_stream == nullptr) + gpu_stream = cuda_create_stream(gpu_idx); + return gpu_stream; + } +}; + +// Track resources required for the execution of a single DFG, +// including the GPU states of devices involved in its execution, +// streams and memory allocated which depends on execution progress on +// accelerators before it can be freed. As execution on accelerators +// is asynchronous, this must wait for the next synchronization point. struct GPU_DFG { + std::vector gpus; uint32_t gpu_idx; void *gpu_stream; GPU_DFG(uint32_t idx) : gpu_idx(idx), pbs_buffer(nullptr) { - gpu_stream = cuda_create_stream(idx); + for (uint32_t i = 0; i < num_devices; ++i) + gpus.push_back(std::move(GPU_state(i))); + gpu_stream = gpus[idx].get_gpu_stream(); } ~GPU_DFG() { - if (pbs_buffer != nullptr) - delete pbs_buffer; free_streams(); - cuda_destroy_stream((cudaStream_t *)gpu_stream, gpu_idx); free_stream_order_dependent_data(); } inline void register_stream(Stream *s) { streams.push_back(s); } @@ -128,52 +198,117 @@ struct GPU_DFG { inline int8_t *get_pbs_buffer(uint32_t glwe_dimension, uint32_t polynomial_size, uint32_t input_lwe_ciphertext_count) { - if (pbs_buffer == nullptr) - pbs_buffer = new PBS_buffer(gpu_stream, gpu_idx, glwe_dimension, - polynomial_size, input_lwe_ciphertext_count); + if (pbs_buffer == nullptr) { + int8_t *ret = gpus[gpu_idx].get_pbs_buffer( + glwe_dimension, polynomial_size, input_lwe_ciphertext_count); + pbs_buffer = gpus[gpu_idx].pbs_buffer; + return ret; + } return pbs_buffer->get_pbs_buffer(gpu_stream, gpu_idx, glwe_dimension, polynomial_size, input_lwe_ciphertext_count); } - void drop_pbs_buffer() { - delete pbs_buffer; - pbs_buffer = nullptr; - } void free_streams(); + inline void *get_gpu_stream(int32_t loc) { + if (loc < 0) + return nullptr; + return gpus[loc].get_gpu_stream(); + } private: std::list to_free_list; std::list streams; PBS_buffer *pbs_buffer; }; + +// Dependences track the location and state of each block of memory +// used as input/output to processes to allow either moving it on/off +// devices or determining when deallocation is possible. struct Dependence { int32_t location; MemRef2 host_data; void *device_data; bool onHostReady; bool hostAllocated; + int32_t chunk_id; + std::vector chunks; bool used; - Dependence(int32_t l, MemRef2 hd, void *dd, bool ohr, bool alloc = false) + Dependence(int32_t l, MemRef2 hd, void *dd, bool ohr, bool alloc = false, + int32_t chunk_id = single_chunk) : location(l), host_data(hd), device_data(dd), onHostReady(ohr), - hostAllocated(alloc), used(false) {} - Dependence(int32_t l, uint64_t val, void *dd, bool ohr, bool alloc = false) + hostAllocated(alloc), chunk_id(chunk_id), used(false) {} + Dependence(int32_t l, uint64_t val, void *dd, bool ohr, bool alloc = false, + int32_t chunk_id = single_chunk) : location(l), device_data(dd), onHostReady(ohr), hostAllocated(alloc), - used(false) { + chunk_id(chunk_id), used(false) { *host_data.aligned = val; } - inline void free_data(GPU_DFG *dfg) { - if (location >= 0) { - cuda_drop_async(device_data, (cudaStream_t *)dfg->gpu_stream, location); + // Split a dependence into a number of chunks either to run on + // multiple GPUs or execute concurrently on the host. + void split_dependence(size_t num_chunks, size_t chunk_dim, bool constant) { + assert(onHostReady && "Cannot split dependences located on a device."); + if (location == split_location) { + if (num_chunks != chunks.size()) + warnx("WARNING: requesting to split dependence across different number " + "of chunks (%lu) than it already is split (%lu) which would " + "require remapping. This is not supported.", + num_chunks, chunks.size()); + return; + } + size_t num_samples = host_data.sizes[chunk_dim]; + assert(num_samples > 0); + // If this is a constant (same data for each chunk), then copy a + // descriptor corresponding to the whole dependence for each + // chunk. + if (constant) { + for (size_t i = 0; i < num_chunks; ++i) { + MemRef2 m = host_data; + m.allocated = nullptr; + chunks.push_back( + new Dependence(host_location, m, nullptr, onHostReady, false, i)); + } + return; + } + size_t chunk_size = num_samples / num_chunks; + size_t chunk_remainder = num_samples % num_chunks; + uint64_t offset = 0; + for (size_t i = 0; i < num_chunks; ++i) { + size_t chunk_size_ = (i < chunk_remainder) ? chunk_size + 1 : chunk_size; + MemRef2 m = host_data; + m.sizes[chunk_dim] = chunk_size_; + m.offset = offset + host_data.offset; + void *dd = (device_data == nullptr) ? device_data + : (uint64_t *)device_data + offset; + offset += chunk_size_ * host_data.strides[chunk_dim]; + chunks.push_back(new Dependence(location, m, dd, onHostReady, false, i)); + } + chunk_id = split_chunks; + location = split_location; + } + inline void free_data(GPU_DFG *dfg, bool immediate = false) { + if (location >= 0 && device_data != nullptr) { + cuda_drop_async(device_data, + (cudaStream_t *)dfg->get_gpu_stream(location), location); } if (onHostReady && host_data.allocated != nullptr && hostAllocated) { // As streams are not synchronized aside from the GET operation, // we cannot free host-side data until after the synchronization // point as it could still be used by an asynchronous operation. - dfg->register_stream_order_dependent_allocation(host_data.allocated); + if (immediate) + free(host_data.allocated); + else + dfg->register_stream_order_dependent_allocation(host_data.allocated); } + for (auto c : chunks) + c->free_data(dfg, immediate); + chunks.clear(); delete (this); } }; + +// Set of input/output streams required to execute a process' +// activation, along with any parameters (for KS / BS) and the +// associated work-function. struct Process { std::vector input_streams; std::vector output_streams; @@ -187,24 +322,43 @@ struct Process { Param sk_index; Param output_size; Context ctx; - void (*fun)(Process *); + void (*fun)(Process *, int32_t, uint64_t *); char name[80]; + bool batched_process; }; -static inline void schedule_kernel(Process *p) { - std::cout << " Scheduling a " << p->name << " on GPU " << p->dfg->gpu_idx - << "\n"; - p->fun(p); +void memref_keyswitch_lwe_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +void memref_bootstrap_lwe_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +void memref_add_lwe_ciphertexts_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +void memref_add_plaintext_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +void memref_mul_cleartext_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +void memref_negate_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr); +static inline void schedule_kernel(Process *p, uint32_t loc, + uint64_t *out_ptr) { + p->fun(p, loc, out_ptr); } - struct Stream { - stream_type type; Dependence *dep; + Dependence *saved_dependence; + stream_type type; Process *producer; std::vector consumers; GPU_DFG *dfg; + bool batched_stream; + bool const_stream; // When a batched op uses the same value on this + // stream for all ops + bool ct_stream; + bool pt_stream; Stream(stream_type t) - : type(t), dep(nullptr), producer(nullptr), dfg(nullptr) {} + : dep(nullptr), type(t), producer(nullptr), dfg(nullptr), + batched_stream(false), const_stream(false), ct_stream(false), + pt_stream(false) {} ~Stream() { if (dep != nullptr) dep->free_data(dfg); @@ -212,31 +366,27 @@ struct Stream { delete producer; } void put(Dependence *d) { - if (type == TS_STREAM_TYPE_X86_TO_TOPO_LSAP) { - assert(d->onHostReady && - "Host-to-device stream should have data initially on host."); - size_t data_size = memref_get_data_size(d->host_data); - d->device_data = cuda_malloc_async( - data_size, (cudaStream_t *)dfg->gpu_stream, dfg->gpu_idx); - cuda_memcpy_async_to_gpu( - d->device_data, d->host_data.aligned + d->host_data.offset, data_size, - (cudaStream_t *)dfg->gpu_stream, dfg->gpu_idx); - d->location = dfg->gpu_idx; - } - if (type == TS_STREAM_TYPE_TOPO_TO_TOPO_LSAP) - assert(d->location == (int32_t)dfg->gpu_idx && - "Data transfers between GPUs not supported yet"); - // TODO: in case of TS_STREAM_TYPE_TOPO_TO_X86_LSAP, we could - // initiate transfer back to host early here - but need to - // allocate memory and then copy out again. Tradeoff might be - // worth testing. - // If a dependence was already present, schedule deallocation. if (dep != nullptr) dep->free_data(dfg); dep = d; } - void schedule_work() { + void eager_dependence_deallocation() { + // If there's no producer process for this stream, it is fed by + // the control program - nothing to do + if (producer == nullptr) + return; + // Recursively go up the DFG to check if new data is available + for (auto s : producer->input_streams) + s->eager_dependence_deallocation(); + if (dep != nullptr) { + dep->free_data(dfg, true); + dep = nullptr; + } + } + // For a given dependence, traverse the DFG backwards to extract the lattice + // of kernels required to execute to produce this data + void extract_producing_graph(std::list &queue) { // If there's no producer process for this stream, it is fed by // the control program - nothing to do if (producer == nullptr) { @@ -245,41 +395,243 @@ struct Stream { } // Recursively go up the DFG to check if new data is available for (auto s : producer->input_streams) - s->schedule_work(); + s->extract_producing_graph(queue); // Check if any of the inputs have changed - and if so recompute // this value. Do not recompute if no changes. for (auto s : producer->input_streams) if (dep == nullptr || s->dep->used == false) { - schedule_kernel(producer); + queue.push_back(producer); break; } } - Dependence *get_on_host(MemRef2 &out, bool has_scheduled = false) { - if (!has_scheduled) - schedule_work(); + void schedule_work(MemRef2 &out) { + std::list queue; + extract_producing_graph(queue); + if (queue.empty()) + return; + + // TODO : replace with on-cpu execution, see if can be parallelised + // Do this for subgraphs that don't use BSes + bool is_batched_subgraph = false; + bool subgraph_bootstraps = false; + for (auto p : queue) { + is_batched_subgraph |= p->batched_process; + subgraph_bootstraps |= (p->fun == memref_bootstrap_lwe_u64_process); + } + // If this subgraph is not batched, then use this DFG's allocated + // GPU to offload to. + if (!is_batched_subgraph) { + for (auto p : queue) + schedule_kernel(p, (subgraph_bootstraps) ? dfg->gpu_idx : host_location, + nullptr); + return; + } + + // Identify all inputs to these processes that are not also + // outputs (i.e. data not produced within this subgraph) - and + // outputs of the subgraph. + std::list inputs; + std::list inputs_all; + std::list outputs; + std::list intermediate_values; + for (auto p : queue) { + inputs.insert(inputs.end(), p->input_streams.begin(), + p->input_streams.end()); + outputs.insert(outputs.end(), p->output_streams.begin(), + p->output_streams.end()); + } + inputs.sort(); + inputs.unique(); + inputs_all = inputs; + outputs.sort(); + outputs.unique(); + intermediate_values = outputs; + for (auto o : outputs) + inputs.remove(o); + for (auto i : inputs_all) + outputs.remove(i); + for (auto o : outputs) + intermediate_values.remove(o); + assert(!inputs.empty() && !outputs.empty()); + + // Decide on number of chunks to split -- TODO: refine this + size_t mem_per_sample = 0; + size_t const_mem_per_sample = 0; + size_t num_samples = 1; + size_t num_real_inputs = 0; + auto add_size = [&](Stream *s) { + // Const streams data is required in whole for each computation, + // we treat this separately as it can be substantial. + if (s->const_stream) { + const_mem_per_sample += memref_get_data_size(s->dep->host_data); + return; + } + // If this is a ciphertext + if (s->ct_stream) { + mem_per_sample += s->dep->host_data.sizes[1] * sizeof(uint64_t); + num_real_inputs++; + if (s->dep->host_data.sizes[0] > num_samples) + num_samples = s->dep->host_data.sizes[0]; + } else { + mem_per_sample += sizeof(uint64_t); + } + }; + for (auto i : inputs) + add_size(i); + // Approximate the memory required for intermediate values and outputs + mem_per_sample += mem_per_sample * + (outputs.size() + intermediate_values.size()) / + (num_real_inputs ? num_real_inputs : 1); + + // If the subgraph does not have sufficient computational + // intensity (which we approximate by whether it bootstraps), then + // we assume (FIXME- confirm with profiling) that it is not + // beneficial to offload to GPU. + if (!subgraph_bootstraps) { + // TODO: We can split up the chunk into enough pieces to run across + // the host cores + for (auto p : queue) { + schedule_kernel(p, host_location, nullptr); + } + // We will assume that only one subgraph is being processed per + // DFG at a time, so we can safely free these here. + dfg->free_stream_order_dependent_data(); + return; + } // else + + // Do schedule on GPUs + size_t gpu_free_mem; + size_t gpu_total_mem; + auto status = cudaMemGetInfo(&gpu_free_mem, &gpu_total_mem); + assert(status == cudaSuccess); + + // TODO - for now assume each device on the system has roughly same + // available memory. + size_t available_mem = gpu_free_mem; + // Further assume (FIXME) that kernel execution requires twice as much + // memory per sample + size_t num_samples_per_chunk = + (available_mem - const_mem_per_sample) / (mem_per_sample * 2); + size_t num_chunks = num_samples / num_samples_per_chunk + + ((num_samples % num_samples_per_chunk) ? 1 : 0); + // If we don't have enough samples, restrict the number of devices to use + int32_t num_devices_to_use = + (num_devices < num_samples) ? num_devices : num_samples; + // Make number of chunks multiple of number of devices. + num_chunks = (num_chunks / num_devices_to_use + + ((num_chunks % num_devices_to_use) ? 1 : 0)) * + num_devices_to_use; + int32_t target_device = 0; + for (auto i : inputs) { + i->dep->split_dependence(num_chunks, (i->ct_stream) ? 0 : 1, + i->const_stream); + // Keep the original dependence as it may be required as input + // outside of this subgraph. + i->dep->used = true; + i->saved_dependence = i->dep; + // Setting this to null prevents deallocation of the saved dependence + i->dep = nullptr; + } + // Prepare space for writing outputs + for (auto o : outputs) { + assert(o->batched_stream && o->ct_stream && + "Only operations with ciphertext output supported."); + MemRef2 output = out; + size_t data_size = memref_get_data_size(out); + output.allocated = output.aligned = (uint64_t *)malloc(data_size); + output.offset = 0; + o->saved_dependence = new Dependence(host_location, output, nullptr, true, + true, single_chunk); + } + for (size_t chunk = 0; chunk < num_chunks; chunk += num_devices_to_use) { + for (size_t c = chunk; c < chunk + num_devices_to_use; ++c) { + for (auto i : inputs) + i->put(i->saved_dependence->chunks[c]); + for (auto p : queue) { + schedule_kernel(p, target_device, nullptr); + } + for (auto o : outputs) { + o->saved_dependence->chunks.push_back(o->dep); + o->dep = nullptr; + } + target_device = (target_device + 1) % num_devices; + } + // Once we've scheduled work on all devices, we can go gather up the + // outputs + for (auto o : outputs) { + for (auto d : o->saved_dependence->chunks) { + // Write out the piece in the final target dependence + size_t csize = memref_get_data_size(d->host_data); + cuda_memcpy_async_to_cpu( + ((char *)o->saved_dependence->host_data.aligned) + + o->saved_dependence->host_data.offset, + d->device_data, csize, + (cudaStream_t *)dfg->get_gpu_stream(d->location), d->location); + d->free_data(dfg); + o->saved_dependence->host_data.offset += csize; + } + o->saved_dependence->chunks.clear(); + } + } + // Restore the saved_dependence and deallocate the last input chunks. + for (auto i : inputs) { + i->put(i->saved_dependence); + i->dep->chunks.clear(); + i->saved_dependence = nullptr; + } + for (auto o : outputs) { + o->saved_dependence->host_data.offset = 0; + o->put(o->saved_dependence); + o->saved_dependence = nullptr; + } + // Force deallocation and clearing of all inner dependences which + // are invalid outside of this chunking context. + for (auto iv : intermediate_values) + iv->put(nullptr); + } + Dependence *get_on_host(MemRef2 &out) { + schedule_work(out); assert(dep != nullptr && "GET on empty stream not allowed."); dep->used = true; // If this was already copied to host, copy out if (dep->onHostReady) { memref_copy_contiguous(out, dep->host_data); + return dep; + } else if (dep->location == split_location) { + char *pos = (char *)(out.aligned + out.offset); + // dep->chunks.sort(order_dependence_chunks); + std::list devices_used; + for (auto c : dep->chunks) { + size_t data_size = memref_get_data_size(c->host_data); + cuda_memcpy_async_to_cpu( + pos, c->device_data, data_size, + (cudaStream_t *)dfg->get_gpu_stream(c->location), c->location); + pos += data_size; + devices_used.push_back(c->location); + } + // We should only synchronize devices that had data chunks + devices_used.sort(); + devices_used.unique(); + for (auto i : devices_used) + cudaStreamSynchronize(*(cudaStream_t *)dfg->get_gpu_stream(i)); } else { size_t data_size = memref_get_data_size(dep->host_data); cuda_memcpy_async_to_cpu(out.aligned + out.offset, dep->device_data, data_size, (cudaStream_t *)dfg->gpu_stream, dep->location); cudaStreamSynchronize(*(cudaStream_t *)dfg->gpu_stream); - // After this synchronization point, all of the host-side - // allocated memory can be freed as we know all asynchronous - // operations have finished. - dfg->free_stream_order_dependent_data(); - dep->host_data = memref_copy_alloc(out); - dep->onHostReady = true; - dep->hostAllocated = true; } + // After this synchronization point, all of the host-side + // allocated memory can be freed as we know all asynchronous + // operations have finished. + dfg->free_stream_order_dependent_data(); + if (!dep->hostAllocated) + dep->host_data = memref_copy_alloc(out); + dep->onHostReady = true; + dep->hostAllocated = true; return dep; } - Dependence *get(int32_t location) { - schedule_work(); + Dependence *get(int32_t location, size_t num_chunks = 1) { assert(dep != nullptr && "Dependence could not be computed."); dep->used = true; if (location == host_location) { @@ -289,12 +641,42 @@ struct Stream { dep->host_data.allocated = dep->host_data.aligned = (uint64_t *)malloc(data_size); dep->hostAllocated = true; - get_on_host(dep->host_data, true); + get_on_host(dep->host_data); + return dep; + } else if (location == split_location) { + if (dep->location == host_location && dep->onHostReady) { + dep->split_dependence(num_chunks, 0, false); + for (auto c : dep->chunks) { + assert(c->chunk_id >= 0); + c->location = c->chunk_id % num_devices; + size_t data_size = memref_get_data_size(c->host_data); + c->device_data = cuda_malloc_async( + data_size, (cudaStream_t *)dfg->get_gpu_stream(c->location), + c->location); + cuda_memcpy_async_to_gpu( + c->device_data, c->host_data.aligned + c->host_data.offset, + data_size, (cudaStream_t *)dfg->get_gpu_stream(c->location), + c->location); + } + } else { + assert(dep->location == split_location); + } + return dep; + } else { + // In case this dependence is needed on a single device + if (dep->location == location) + return dep; + assert(dep->onHostReady && + "Device-to-device data transfers not supported yet."); + size_t data_size = memref_get_data_size(dep->host_data); + dep->device_data = cuda_malloc_async( + data_size, (cudaStream_t *)dfg->get_gpu_stream(location), location); + cuda_memcpy_async_to_gpu( + dep->device_data, dep->host_data.aligned + dep->host_data.offset, + data_size, (cudaStream_t *)dfg->get_gpu_stream(location), location); + dep->location = location; return dep; } - assert(dep->location == location && - "Multi-GPU within the same SDFG not supported"); - return dep; } }; @@ -306,7 +688,8 @@ void GPU_DFG::free_streams() { } static inline mlir::concretelang::gpu_dfg::Process * -make_process_1_1(void *dfg, void *sin1, void *sout, void (*fun)(Process *)) { +make_process_1_1(void *dfg, void *sin1, void *sout, + void (*fun)(Process *, int32_t, uint64_t *)) { mlir::concretelang::gpu_dfg::Process *p = new mlir::concretelang::gpu_dfg::Process; mlir::concretelang::gpu_dfg::Stream *s1 = @@ -322,12 +705,13 @@ make_process_1_1(void *dfg, void *sin1, void *sout, void (*fun)(Process *)) { so->dfg = s1->dfg = (GPU_DFG *)dfg; p->dfg->register_stream(s1); p->dfg->register_stream(so); + p->batched_process = s1->batched_stream; return p; } static inline mlir::concretelang::gpu_dfg::Process * make_process_2_1(void *dfg, void *sin1, void *sin2, void *sout, - void (*fun)(Process *)) { + void (*fun)(Process *, int32_t, uint64_t *)) { mlir::concretelang::gpu_dfg::Process *p = new mlir::concretelang::gpu_dfg::Process; mlir::concretelang::gpu_dfg::Stream *s1 = @@ -348,9 +732,17 @@ make_process_2_1(void *dfg, void *sin1, void *sin2, void *sout, p->dfg->register_stream(s1); p->dfg->register_stream(s2); p->dfg->register_stream(so); + p->batched_process = s1->batched_stream; return p; } +[[maybe_unused]] static void sdfg_gpu_debug_print_mref(const char *c, + MemRef2 m) { + std::cout << c << " : " << m.allocated << ", " << m.aligned << ", " + << m.offset << ", [" << m.sizes[0] << ", " << m.sizes[1] << "], [" + << m.strides[0] << ", " << m.strides[1] << "]\n"; +} + [[maybe_unused]] static MemRef2 sdfg_gpu_debug_dependence(Dependence *d, cudaStream_t *s) { if (d->onHostReady) @@ -384,37 +776,52 @@ sdfg_gpu_debug_compare_memref(MemRef2 &a, MemRef2 &b, char const *msg) { } // Stream emulator processes -void memref_keyswitch_lwe_u64_process(Process *p) { +void memref_keyswitch_lwe_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { assert(p->sk_index.val == 0 && "multiple ksk is not yet implemented on GPU"); - Dependence *idep = p->input_streams[0]->get(p->dfg->gpu_idx); - uint64_t num_samples = idep->host_data.sizes[0]; - MemRef2 out = { - 0, 0, 0, {num_samples, p->output_size.val}, {p->output_size.val, 1}}; - void *ct0_gpu = idep->device_data; - void *ksk_gpu = p->ctx.val->get_ksk_gpu( - p->level.val, p->input_lwe_dim.val, p->output_lwe_dim.val, - p->dfg->gpu_idx, (cudaStream_t *)p->dfg->gpu_stream); - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - // Schedule the keyswitch kernel on the GPU - cuda_keyswitch_lwe_ciphertext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, ct0_gpu, - ksk_gpu, p->input_lwe_dim.val, p->output_lwe_dim.val, p->base_log.val, - p->level.val, num_samples); - Dependence *dep = - new Dependence((int32_t)p->dfg->gpu_idx, out, out_gpu, false); - p->output_streams[0]->put(dep); + auto sched = [&](Dependence *d) { + uint64_t num_samples = d->host_data.sizes[0]; + MemRef2 out = { + 0, 0, 0, {num_samples, p->output_size.val}, {p->output_size.val, 1}}; + size_t data_size = memref_get_data_size(out); + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + memref_batched_keyswitch_lwe_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d->host_data.allocated, + d->host_data.aligned, d->host_data.offset, d->host_data.sizes[0], + d->host_data.sizes[1], d->host_data.strides[0], + d->host_data.strides[1], p->level.val, p->base_log.val, + p->input_lwe_dim.val, p->output_lwe_dim.val, p->sk_index.val, + p->ctx.val); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d->chunk_id); + return dep; + } else { + // Schedule the keyswitch kernel on the GPU + cudaStream_t *s = (cudaStream_t *)p->dfg->get_gpu_stream(loc); + void *ct0_gpu = d->device_data; + void *out_gpu = cuda_malloc_async(data_size, s, loc); + void *ksk_gpu = p->ctx.val->get_ksk_gpu( + p->level.val, p->input_lwe_dim.val, p->output_lwe_dim.val, loc, s); + cuda_keyswitch_lwe_ciphertext_vector_64( + s, loc, out_gpu, ct0_gpu, ksk_gpu, p->input_lwe_dim.val, + p->output_lwe_dim.val, p->base_log.val, p->level.val, num_samples); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d->chunk_id); + return dep; + } + }; + Dependence *idep = p->input_streams[0]->get(loc); + p->output_streams[0]->put(sched(idep)); } -void memref_bootstrap_lwe_u64_process(Process *p) { +void memref_bootstrap_lwe_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { assert(p->sk_index.val == 0 && "multiple bsk is not yet implemented on GPU"); assert(p->output_size.val == p->glwe_dim.val * p->poly_size.val + 1); - void *fbsk_gpu = p->ctx.val->get_bsk_gpu( - p->input_lwe_dim.val, p->poly_size.val, p->level.val, p->glwe_dim.val, - p->dfg->gpu_idx, (cudaStream_t *)p->dfg->gpu_stream); - Dependence *idep0 = p->input_streams[0]->get(p->dfg->gpu_idx); - void *ct0_gpu = idep0->device_data; Dependence *idep1 = p->input_streams[1]->get(host_location); MemRef2 &mtlu = idep1->host_data; @@ -434,116 +841,302 @@ void memref_bootstrap_lwe_u64_process(Process *p) { glwe_ct[pos++] = tlu[postlu++]; } } - void *glwe_ct_gpu = cuda_malloc_async( - glwe_ct_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_memcpy_async_to_gpu(glwe_ct_gpu, glwe_ct, glwe_ct_size, - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - uint64_t num_samples = idep0->host_data.sizes[0]; - MemRef2 out = { - 0, 0, 0, {num_samples, p->output_size.val}, {p->output_size.val, 1}}; - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cudaMemsetAsync(out_gpu, 0, data_size, *(cudaStream_t *)p->dfg->gpu_stream); + auto sched = [&](Dependence *d0, Dependence *d1, uint64_t *glwe_ct, + std::vector &lut_indexes, cudaStream_t *s, + int32_t loc) { + uint64_t num_samples = d0->host_data.sizes[0]; + MemRef2 out = { + 0, 0, 0, {num_samples, p->output_size.val}, {p->output_size.val, 1}}; + size_t data_size = memref_get_data_size(out); - // Move test vector indexes to the GPU, the test vector indexes is set of 0 - uint32_t lwe_idx = 0, test_vector_idxes_size = num_samples * sizeof(uint64_t); - uint64_t *test_vector_idxes = (uint64_t *)malloc(test_vector_idxes_size); - if (num_lut_vectors == 1) { - memset((void *)test_vector_idxes, 0, test_vector_idxes_size); + // Move test vector indexes to the GPU, the test vector indexes is set of 0 + uint32_t lwe_idx = 0, + test_vector_idxes_size = num_samples * sizeof(uint64_t); + uint64_t *test_vector_idxes = (uint64_t *)malloc(test_vector_idxes_size); + if (lut_indexes.size() == 1) { + memset((void *)test_vector_idxes, lut_indexes[0], test_vector_idxes_size); + } else { + assert(lut_indexes.size() == num_samples); + for (size_t i = 0; i < num_samples; ++i) + test_vector_idxes[i] = lut_indexes[i]; + } + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + if (lut_indexes.size() == 1) + memref_batched_bootstrap_lwe_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], d1->host_data.allocated, + d1->host_data.aligned, d1->host_data.offset, d1->host_data.sizes[1], + d1->host_data.strides[1], p->input_lwe_dim.val, p->poly_size.val, + p->level.val, p->base_log.val, p->glwe_dim.val, p->sk_index.val, + p->ctx.val); + else + memref_batched_mapped_bootstrap_lwe_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], d1->host_data.allocated, + d1->host_data.aligned, d1->host_data.offset, d1->host_data.sizes[0], + d1->host_data.sizes[1], d1->host_data.strides[0], + d1->host_data.strides[1], p->input_lwe_dim.val, p->poly_size.val, + p->level.val, p->base_log.val, p->glwe_dim.val, p->sk_index.val, + p->ctx.val); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d0->chunk_id); + free(glwe_ct); + return dep; + } else { + // Schedule the bootstrap kernel on the GPU + void *glwe_ct_gpu = cuda_malloc_async(glwe_ct_size, s, loc); + cuda_memcpy_async_to_gpu(glwe_ct_gpu, glwe_ct, glwe_ct_size, s, loc); + void *test_vector_idxes_gpu = + cuda_malloc_async(test_vector_idxes_size, s, loc); + cuda_memcpy_async_to_gpu(test_vector_idxes_gpu, (void *)test_vector_idxes, + test_vector_idxes_size, s, loc); + int8_t *pbs_buffer = p->dfg->gpus[loc].get_pbs_buffer( + p->glwe_dim.val, p->poly_size.val, num_samples); + void *ct0_gpu = d0->device_data; + void *out_gpu = cuda_malloc_async(data_size, s, loc); + void *fbsk_gpu = + p->ctx.val->get_bsk_gpu(p->input_lwe_dim.val, p->poly_size.val, + p->level.val, p->glwe_dim.val, loc, s); + cuda_bootstrap_amortized_lwe_ciphertext_vector_64( + s, loc, out_gpu, glwe_ct_gpu, test_vector_idxes_gpu, ct0_gpu, + fbsk_gpu, (int8_t *)pbs_buffer, p->input_lwe_dim.val, p->glwe_dim.val, + p->poly_size.val, p->base_log.val, p->level.val, num_samples, + lut_indexes.size(), lwe_idx, cuda_get_max_shared_memory(loc)); + cuda_drop_async(test_vector_idxes_gpu, s, loc); + cuda_drop_async(glwe_ct_gpu, s, loc); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d0->chunk_id); + // As streams are not synchronized, we can only free this vector + // after a later synchronization point where we are guaranteed that + // this vector is no longer needed. + p->dfg->register_stream_order_dependent_allocation(test_vector_idxes); + p->dfg->register_stream_order_dependent_allocation(glwe_ct); + return dep; + } + }; + + // If this is a mapped TLU + // FIXME: for now we do not provide more advanced ways of selecting + bool mapped = (p->input_streams[1]->dep->host_data.sizes[0] > 1); + std::vector lut_indexes; + if (mapped) { + lut_indexes.resize(num_lut_vectors); + std::iota(lut_indexes.begin(), lut_indexes.end(), 0); } else { - assert(num_lut_vectors == num_samples); - for (size_t i = 0; i < num_lut_vectors; ++i) - test_vector_idxes[i] = i; + lut_indexes.push_back(0); } - void *test_vector_idxes_gpu = - cuda_malloc_async(test_vector_idxes_size, - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_memcpy_async_to_gpu(test_vector_idxes_gpu, (void *)test_vector_idxes, - test_vector_idxes_size, - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - // Schedule the bootstrap kernel on the GPU - int8_t *pbs_buffer = - p->dfg->get_pbs_buffer(p->glwe_dim.val, p->poly_size.val, num_samples); - cuda_bootstrap_amortized_lwe_ciphertext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, glwe_ct_gpu, - test_vector_idxes_gpu, ct0_gpu, fbsk_gpu, (int8_t *)pbs_buffer, - p->input_lwe_dim.val, p->glwe_dim.val, p->poly_size.val, p->base_log.val, - p->level.val, num_samples, num_lut_vectors, lwe_idx, - cuda_get_max_shared_memory(p->dfg->gpu_idx)); - cuda_drop_async(test_vector_idxes_gpu, (cudaStream_t *)p->dfg->gpu_stream, - p->dfg->gpu_idx); - Dependence *dep = - new Dependence((int32_t)p->dfg->gpu_idx, out, out_gpu, false); - // As streams are not synchronized, we can only free this vector - // after a later synchronization point where we are guaranteed that - // this vector is no longer needed. - p->dfg->register_stream_order_dependent_allocation(test_vector_idxes); - p->output_streams[0]->put(dep); + + cudaStream_t *cstream = (cudaStream_t *)p->dfg->get_gpu_stream(loc); + Dependence *idep0 = p->input_streams[0]->get(loc); + p->output_streams[0]->put( + sched(idep0, idep1, glwe_ct, lut_indexes, cstream, loc)); } -void memref_add_lwe_ciphertexts_u64_process(Process *p) { - Dependence *idep0 = p->input_streams[0]->get(p->dfg->gpu_idx); - Dependence *idep1 = p->input_streams[1]->get(p->dfg->gpu_idx); - MemRef2 ct0 = idep0->host_data; - uint64_t num_samples = ct0.sizes[0]; - MemRef2 out = {0, 0, 0, {num_samples, ct0.sizes[1]}, {ct0.sizes[1], 1}}; - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_add_lwe_ciphertext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, - idep0->device_data, idep1->device_data, ct0.sizes[1] - 1, num_samples); - Dependence *dep = new Dependence(p->dfg->gpu_idx, out, out_gpu, false); - p->output_streams[0]->put(dep); +void memref_add_lwe_ciphertexts_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { + auto sched = [&](Dependence *d0, Dependence *d1, cudaStream_t *s, + int32_t loc) { + assert(d0->host_data.sizes[0] == d1->host_data.sizes[0]); + assert(d0->host_data.sizes[1] == d1->host_data.sizes[1]); + assert(d0->location == d1->location); + assert(d0->chunk_id == d1->chunk_id); + uint64_t num_samples = d0->host_data.sizes[0]; + MemRef2 out = {0, + 0, + 0, + {num_samples, d0->host_data.sizes[1]}, + {d0->host_data.sizes[1], 1}}; + size_t data_size = memref_get_data_size(out); + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + memref_batched_add_lwe_ciphertexts_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], d1->host_data.allocated, + d1->host_data.aligned, d1->host_data.offset, d1->host_data.sizes[0], + d1->host_data.sizes[1], d1->host_data.strides[0], + d1->host_data.strides[1]); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d0->chunk_id); + return dep; + } else { + // Schedule the kernel on the GPU + void *out_gpu = cuda_malloc_async(data_size, s, loc); + cuda_add_lwe_ciphertext_vector_64( + s, loc, out_gpu, d0->device_data, d1->device_data, + d0->host_data.sizes[1] - 1, num_samples); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d0->chunk_id); + return dep; + } + }; + Dependence *idep0 = p->input_streams[0]->get(loc); + Dependence *idep1 = p->input_streams[1]->get(loc); + p->output_streams[0]->put( + sched(idep0, idep1, (cudaStream_t *)p->dfg->get_gpu_stream(loc), loc)); } -void memref_add_plaintext_lwe_ciphertext_u64_process(Process *p) { - Dependence *idep0 = p->input_streams[0]->get(p->dfg->gpu_idx); - Dependence *idep1 = p->input_streams[1]->get(p->dfg->gpu_idx); - MemRef2 ct0 = idep0->host_data; - uint64_t num_samples = ct0.sizes[0]; - MemRef2 out = {0, 0, 0, {num_samples, ct0.sizes[1]}, {ct0.sizes[1], 1}}; - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_add_lwe_ciphertext_vector_plaintext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, - idep0->device_data, idep1->device_data, ct0.sizes[1] - 1, num_samples); - Dependence *dep = new Dependence(p->dfg->gpu_idx, out, out_gpu, false); - p->output_streams[0]->put(dep); +void memref_add_plaintext_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { + auto sched = [&](Dependence *d0, Dependence *d1, cudaStream_t *s, + int32_t loc) { + assert(d0->host_data.sizes[0] == d1->host_data.sizes[1] || + d1->host_data.sizes[1] == 1); + assert(d0->location == d1->location); + assert(d0->chunk_id == d1->chunk_id); + uint64_t num_samples = d0->host_data.sizes[0]; + MemRef2 out = {0, + 0, + 0, + {num_samples, d0->host_data.sizes[1]}, + {d0->host_data.sizes[1], 1}}; + size_t data_size = memref_get_data_size(out); + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + if (d1->host_data.sizes[1] == 1) // Constant case + memref_batched_add_plaintext_cst_lwe_ciphertext_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], *d1->host_data.aligned); + else + memref_batched_add_plaintext_lwe_ciphertext_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], d1->host_data.allocated, + d1->host_data.aligned, d1->host_data.offset, d1->host_data.sizes[1], + d1->host_data.strides[1]); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d0->chunk_id); + return dep; + } else { + // Schedule the kernel on the GPU + void *out_gpu = cuda_malloc_async(data_size, s, loc); + cuda_add_lwe_ciphertext_vector_plaintext_vector_64( + s, loc, out_gpu, d0->device_data, d1->device_data, + d0->host_data.sizes[1] - 1, num_samples); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d0->chunk_id); + return dep; + } + }; + Dependence *idep0 = p->input_streams[0]->get(loc); + Dependence *idep1 = p->input_streams[1]->get(loc); + p->output_streams[0]->put( + sched(idep0, idep1, (cudaStream_t *)p->dfg->get_gpu_stream(loc), loc)); } -void memref_mul_cleartext_lwe_ciphertext_u64_process(Process *p) { - Dependence *idep0 = p->input_streams[0]->get(p->dfg->gpu_idx); - Dependence *idep1 = p->input_streams[1]->get(p->dfg->gpu_idx); - MemRef2 ct0 = idep0->host_data; - uint64_t num_samples = ct0.sizes[0]; - MemRef2 out = {0, 0, 0, {num_samples, ct0.sizes[1]}, {ct0.sizes[1], 1}}; - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_mult_lwe_ciphertext_vector_cleartext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, - idep0->device_data, idep1->device_data, ct0.sizes[1] - 1, num_samples); - Dependence *dep = new Dependence(p->dfg->gpu_idx, out, out_gpu, false); - p->output_streams[0]->put(dep); +void memref_mul_cleartext_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { + auto sched = [&](Dependence *d0, Dependence *d1, cudaStream_t *s, + int32_t loc) { + assert(d0->host_data.sizes[0] == d1->host_data.sizes[1] || + d1->host_data.sizes[1] == 1); + assert(d0->location == d1->location); + assert(d0->chunk_id == d1->chunk_id); + uint64_t num_samples = d0->host_data.sizes[0]; + MemRef2 out = {0, + 0, + 0, + {num_samples, d0->host_data.sizes[1]}, + {d0->host_data.sizes[1], 1}}; + size_t data_size = memref_get_data_size(out); + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + if (d1->host_data.sizes[1] == 1) // Constant case + memref_batched_mul_cleartext_cst_lwe_ciphertext_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], *d1->host_data.aligned); + else + memref_batched_mul_cleartext_lwe_ciphertext_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1], d1->host_data.allocated, + d1->host_data.aligned, d1->host_data.offset, d1->host_data.sizes[1], + d1->host_data.strides[1]); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d0->chunk_id); + return dep; + } else { + // Schedule the keyswitch kernel on the GPU + void *out_gpu = cuda_malloc_async(data_size, s, loc); + cuda_mult_lwe_ciphertext_vector_cleartext_vector_64( + s, loc, out_gpu, d0->device_data, d1->device_data, + d0->host_data.sizes[1] - 1, num_samples); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d0->chunk_id); + return dep; + } + }; + Dependence *idep0 = p->input_streams[0]->get(loc); + Dependence *idep1 = p->input_streams[1]->get(loc); + p->output_streams[0]->put( + sched(idep0, idep1, (cudaStream_t *)p->dfg->get_gpu_stream(loc), loc)); } -void memref_negate_lwe_ciphertext_u64_process(Process *p) { - Dependence *idep = p->input_streams[0]->get(p->dfg->gpu_idx); - MemRef2 ct0 = idep->host_data; - uint64_t num_samples = ct0.sizes[0]; - MemRef2 out = {0, 0, 0, {num_samples, ct0.sizes[1]}, {ct0.sizes[1], 1}}; - size_t data_size = memref_get_data_size(out); - void *out_gpu = cuda_malloc_async( - data_size, (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx); - cuda_negate_lwe_ciphertext_vector_64( - (cudaStream_t *)p->dfg->gpu_stream, p->dfg->gpu_idx, out_gpu, - idep->device_data, ct0.sizes[1] - 1, num_samples); - Dependence *dep = new Dependence(p->dfg->gpu_idx, out, out_gpu, false); - p->output_streams[0]->put(dep); +void memref_negate_lwe_ciphertext_u64_process(Process *p, int32_t loc, + uint64_t *out_ptr) { + auto sched = [&](Dependence *d0, cudaStream_t *s, int32_t loc) { + uint64_t num_samples = d0->host_data.sizes[0]; + MemRef2 out = {0, + 0, + 0, + {num_samples, d0->host_data.sizes[1]}, + {d0->host_data.sizes[1], 1}}; + size_t data_size = memref_get_data_size(out); + if (loc == host_location) { + // If it is not profitable to offload, schedule kernel on CPU + out.allocated = out.aligned = + (uint64_t *)((out_ptr != nullptr) ? out_ptr : malloc(data_size)); + memref_batched_negate_lwe_ciphertext_u64( + out.allocated, out.aligned, out.offset, out.sizes[0], out.sizes[1], + out.strides[0], out.strides[1], d0->host_data.allocated, + d0->host_data.aligned, d0->host_data.offset, d0->host_data.sizes[0], + d0->host_data.sizes[1], d0->host_data.strides[0], + d0->host_data.strides[1]); + Dependence *dep = + new Dependence(loc, out, nullptr, true, true, d0->chunk_id); + return dep; + } else { + // Schedule the kernel on the GPU + void *out_gpu = cuda_malloc_async(data_size, s, loc); + cuda_negate_lwe_ciphertext_vector_64(s, loc, out_gpu, d0->device_data, + d0->host_data.sizes[1] - 1, + num_samples); + Dependence *dep = + new Dependence(loc, out, out_gpu, false, false, d0->chunk_id); + return dep; + } + }; + Dependence *idep0 = p->input_streams[0]->get(loc); + p->output_streams[0]->put( + sched(idep0, (cudaStream_t *)p->dfg->get_gpu_stream(loc), loc)); } } // namespace @@ -591,8 +1184,8 @@ void stream_emulator_make_memref_negate_lwe_ciphertext_u64_process(void *dfg, void stream_emulator_make_memref_keyswitch_lwe_u64_process( void *dfg, void *sin1, void *sout, uint32_t level, uint32_t base_log, - uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t ksk_index, - uint32_t output_size, void *context) { + uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t output_size, + uint32_t ksk_index, void *context) { Process *p = make_process_1_1(dfg, sin1, sout, memref_keyswitch_lwe_u64_process); p->level.val = level; @@ -609,7 +1202,7 @@ void stream_emulator_make_memref_keyswitch_lwe_u64_process( void stream_emulator_make_memref_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context) { + uint32_t output_size, uint32_t bsk_index, void *context) { // The TLU does not need to be sent to GPU ((Stream *)sin2)->type = TS_STREAM_TYPE_X86_TO_X86_LSAP; Process *p = @@ -628,63 +1221,112 @@ void stream_emulator_make_memref_bootstrap_lwe_u64_process( void stream_emulator_make_memref_batched_add_lwe_ciphertexts_u64_process( void *dfg, void *sin1, void *sin2, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->batched_stream = true; + ((Stream *)sin2)->ct_stream = true; stream_emulator_make_memref_add_lwe_ciphertexts_u64_process(dfg, sin1, sin2, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_add_plaintext_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sin2, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->batched_stream = true; + ((Stream *)sin2)->pt_stream = true; stream_emulator_make_memref_add_plaintext_lwe_ciphertext_u64_process( dfg, sin1, sin2, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_add_plaintext_cst_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sin2, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->const_stream = true; + ((Stream *)sin2)->pt_stream = true; stream_emulator_make_memref_add_plaintext_lwe_ciphertext_u64_process( dfg, sin1, sin2, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_mul_cleartext_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sin2, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->batched_stream = true; + ((Stream *)sin2)->pt_stream = true; stream_emulator_make_memref_mul_cleartext_lwe_ciphertext_u64_process( dfg, sin1, sin2, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_mul_cleartext_cst_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sin2, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->const_stream = true; + ((Stream *)sin2)->pt_stream = true; stream_emulator_make_memref_mul_cleartext_lwe_ciphertext_u64_process( dfg, sin1, sin2, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_negate_lwe_ciphertext_u64_process( void *dfg, void *sin1, void *sout) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; stream_emulator_make_memref_negate_lwe_ciphertext_u64_process(dfg, sin1, sout); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_keyswitch_lwe_u64_process( void *dfg, void *sin1, void *sout, uint32_t level, uint32_t base_log, - uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t ksk_index, - uint32_t output_size, void *context) { + uint32_t input_lwe_dim, uint32_t output_lwe_dim, uint32_t output_size, + uint32_t ksk_index, void *context) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; stream_emulator_make_memref_keyswitch_lwe_u64_process( dfg, sin1, sout, level, base_log, input_lwe_dim, output_lwe_dim, - ksk_index, output_size, context); + output_size, ksk_index, context); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context) { + uint32_t output_size, uint32_t bsk_index, void *context) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->const_stream = true; stream_emulator_make_memref_bootstrap_lwe_u64_process( dfg, sin1, sin2, sout, input_lwe_dim, poly_size, level, base_log, - glwe_dim, bsk_index, output_size, context); + glwe_dim, output_size, bsk_index, context); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void stream_emulator_make_memref_batched_mapped_bootstrap_lwe_u64_process( void *dfg, void *sin1, void *sin2, void *sout, uint32_t input_lwe_dim, uint32_t poly_size, uint32_t level, uint32_t base_log, uint32_t glwe_dim, - uint32_t bsk_index, uint32_t output_size, void *context) { + uint32_t output_size, uint32_t bsk_index, void *context) { + ((Stream *)sin1)->batched_stream = true; + ((Stream *)sin1)->ct_stream = true; + ((Stream *)sin2)->batched_stream = true; + ((Stream *)sin2)->ct_stream = true; stream_emulator_make_memref_bootstrap_lwe_u64_process( dfg, sin1, sin2, sout, input_lwe_dim, poly_size, level, base_log, - glwe_dim, bsk_index, output_size, context); + glwe_dim, output_size, bsk_index, context); + ((Stream *)sout)->batched_stream = true; + ((Stream *)sout)->ct_stream = true; } void *stream_emulator_make_uint64_stream(const char *name, stream_type stype) { @@ -703,6 +1345,7 @@ uint64_t stream_emulator_get_uint64(void *stream) { auto s = (Stream *)stream; MemRef2 m = {&res, &res, 0, {1, 1}, {1, 1}}; s->get_on_host(m); + s->eager_dependence_deallocation(); return res; } @@ -730,6 +1373,7 @@ void stream_emulator_get_memref(void *stream, uint64_t *out_allocated, {out_size, out_stride}}; auto s = (Stream *)stream; s->get_on_host(mref); + s->eager_dependence_deallocation(); } void *stream_emulator_make_memref_batch_stream(const char *name, @@ -760,6 +1404,7 @@ void stream_emulator_get_memref_batch(void *stream, uint64_t *out_allocated, {out_stride0, out_stride1}}; auto s = (Stream *)stream; s->get_on_host(mref); + s->eager_dependence_deallocation(); } void *stream_emulator_init() { @@ -768,6 +1413,37 @@ void *stream_emulator_init() { assert(cudaGetDeviceCount(&num) == cudaSuccess); num_devices = num; } + + char *env = getenv("SDFG_NUM_GPUS"); + if (env != nullptr) { + size_t requested_gpus = strtoul(env, NULL, 10); + if (requested_gpus > num_devices) + warnx("WARNING: requested more GPUs (%lu) than available (%lu) - " + "continuing with available devices.", + requested_gpus, num_devices); + else + num_devices = requested_gpus; + } + env = getenv("SDFG_DISTRIBUTE_BATCH_OPS"); + if (env != nullptr && (!strncmp(env, "off", 3) || !strncmp(env, "OFF", 3) || + !strncmp(env, "0", 1))) { + dont_distribute_batched_ops = true; + } + assert(num_devices > 0 && "No GPUs available on system."); + + hwloc_topology_t topology; + hwloc_topology_init(&topology); + hwloc_topology_set_all_types_filter(topology, HWLOC_TYPE_FILTER_KEEP_NONE); + hwloc_topology_set_type_filter(topology, HWLOC_OBJ_CORE, + HWLOC_TYPE_FILTER_KEEP_ALL); + hwloc_topology_load(topology); + num_cores = hwloc_get_nbobjs_by_type(topology, HWLOC_OBJ_CORE); + env = getenv("OMP_NUM_THREADS"); + if (env != nullptr) + num_cores = strtoul(env, NULL, 10); + if (num_cores < 1) + num_cores = 1; + int device = next_device.fetch_add(1) % num_devices; return new GPU_DFG(device); }