mirror of
https://github.com/zama-ai/concrete.git
synced 2026-02-08 11:35:02 -05:00
feat(compiler): add task creation using vectors of futures as inputs and outputs.
This commit is contained in:
@@ -0,0 +1,139 @@
|
||||
// Part of the Concrete Compiler Project, under the BSD3 License with Zama
|
||||
// Exceptions. See
|
||||
// https://github.com/zama-ai/concrete-compiler-internal/blob/main/LICENSE.txt
|
||||
// for license information.
|
||||
|
||||
#ifndef CONCRETELANG_DFR_TASKS_HPP
|
||||
#define CONCRETELANG_DFR_TASKS_HPP
|
||||
#ifdef CONCRETELANG_DATAFLOW_EXECUTION_ENABLED
|
||||
|
||||
namespace mlir {
|
||||
namespace concretelang {
|
||||
namespace dfr {
|
||||
|
||||
using namespace hpx;
|
||||
typedef struct dfr_refcounted_future {
|
||||
hpx::shared_future<void *> *future;
|
||||
std::atomic<std::size_t> count;
|
||||
bool cloned_memref_p;
|
||||
dfr_refcounted_future(hpx::shared_future<void *> *f, size_t c, bool clone_p)
|
||||
: future(f), count(c), cloned_memref_p(clone_p) {}
|
||||
} dfr_refcounted_future_t, *dfr_refcounted_future_p;
|
||||
|
||||
// Determine where new task should run. For now just round-robin
|
||||
// distribution - TODO: optimise.
|
||||
static inline size_t dfr_get_next_execution_locality() {
|
||||
static std::atomic<std::size_t> next_locality{1};
|
||||
|
||||
size_t next_loc = next_locality.fetch_add(1);
|
||||
|
||||
return next_loc % num_nodes;
|
||||
}
|
||||
|
||||
void dfr_create_async_task_impl(wfnptr wfn, void *ctx,
|
||||
std::vector<void *> &refcounted_futures,
|
||||
std::vector<size_t> ¶m_sizes,
|
||||
std::vector<uint64_t> ¶m_types,
|
||||
std::vector<void *> &outputs,
|
||||
std::vector<size_t> &output_sizes,
|
||||
std::vector<uint64_t> &output_types) {
|
||||
// Take a reference on each future argument
|
||||
for (auto rcf : refcounted_futures)
|
||||
((dfr_refcounted_future_p)rcf)->count.fetch_add(1);
|
||||
|
||||
// We pass functions by name - which is not strictly necessary in
|
||||
// shared memory as pointers suffice, but is needed in the
|
||||
// distributed case where the functions need to be located/loaded on
|
||||
// the node.
|
||||
auto wfnname =
|
||||
_dfr_node_level_work_function_registry->getWorkFunctionName((void *)wfn);
|
||||
hpx::future<hpx::future<OpaqueOutputData>> oodf;
|
||||
|
||||
// In order to allow complete dataflow semantics for
|
||||
// communication/synchronization, we split tasks in two parts: an
|
||||
// execution body that is scheduled once all input dependences are
|
||||
// satisfied, which generates a future on a tuple of outputs, which
|
||||
// is then further split into a tuple of futures and provide
|
||||
// individual synchronization for each return independently.
|
||||
GenericComputeClient *gcc_target = &gcc[dfr_get_next_execution_locality()];
|
||||
switch (refcounted_futures.size()) {
|
||||
|
||||
#include "concretelang/Runtime/generated/dfr_dataflow_inputs_cases.h"
|
||||
|
||||
default:
|
||||
HPX_THROW_EXCEPTION(hpx::no_success, "_dfr_create_async_task",
|
||||
"Error: number of task parameters not supported.");
|
||||
}
|
||||
|
||||
switch (outputs.size()) {
|
||||
case 1:
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(hpx::dataflow(
|
||||
[refcounted_futures](
|
||||
hpx::future<OpaqueOutputData> oodf_in) -> void * {
|
||||
void *ret = oodf_in.get().outputs[0];
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return ret;
|
||||
},
|
||||
oodf)),
|
||||
1, output_types[0] == _DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
|
||||
case 2: {
|
||||
hpx::future<hpx::tuple<void *, void *>> &&ft = hpx::dataflow(
|
||||
[refcounted_futures](hpx::future<OpaqueOutputData> oodf_in)
|
||||
-> hpx::tuple<void *, void *> {
|
||||
std::vector<void *> outputs = std::move(oodf_in.get().outputs);
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return hpx::make_tuple<>(outputs[0], outputs[1]);
|
||||
},
|
||||
oodf);
|
||||
hpx::tuple<hpx::future<void *>, hpx::future<void *>> &&tf =
|
||||
hpx::split_future(std::move(ft));
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<0>(tf))), 1,
|
||||
output_types[0] == _DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[1]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<1>(tf))), 1,
|
||||
output_types[1] == _DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
}
|
||||
|
||||
case 3: {
|
||||
hpx::future<hpx::tuple<void *, void *, void *>> &&ft = hpx::dataflow(
|
||||
[refcounted_futures](hpx::future<OpaqueOutputData> oodf_in)
|
||||
-> hpx::tuple<void *, void *, void *> {
|
||||
std::vector<void *> outputs = std::move(oodf_in.get().outputs);
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return hpx::make_tuple<>(outputs[0], outputs[1], outputs[2]);
|
||||
},
|
||||
oodf);
|
||||
hpx::tuple<hpx::future<void *>, hpx::future<void *>, hpx::future<void *>>
|
||||
&&tf = hpx::split_future(std::move(ft));
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<0>(tf))), 1,
|
||||
output_types[0] == _DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[1]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<1>(tf))), 1,
|
||||
output_types[1] == _DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[2]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<2>(tf))), 1,
|
||||
output_types[2] == _DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
HPX_THROW_EXCEPTION(hpx::no_success, "_dfr_create_async_task",
|
||||
"Error: number of task outputs not supported.");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace dfr
|
||||
} // namespace concretelang
|
||||
} // namespace mlir
|
||||
|
||||
#endif
|
||||
#endif
|
||||
@@ -23,7 +23,6 @@ void *_dfr_await_future(void *);
|
||||
_dfr_make_ready_future allocates the future, not the underlying storage.
|
||||
_dfr_create_async_task allocates both future and storage for outputs. */
|
||||
void _dfr_deallocate_future(void *);
|
||||
void _dfr_deallocate_future_data(void *);
|
||||
|
||||
/* Initialisation & termination. */
|
||||
void _dfr_start(int64_t, void *);
|
||||
|
||||
@@ -40,15 +40,9 @@ static struct timespec init_timer, broadcast_timer, compute_timer, whole_timer;
|
||||
} // namespace concretelang
|
||||
} // namespace mlir
|
||||
|
||||
#include "concretelang/Runtime/dfr_tasks.hpp"
|
||||
using namespace hpx;
|
||||
|
||||
typedef struct dfr_refcounted_future {
|
||||
hpx::shared_future<void *> *future;
|
||||
std::atomic<std::size_t> count;
|
||||
bool cloned_memref_p;
|
||||
dfr_refcounted_future(hpx::shared_future<void *> *f, size_t c, bool clone_p)
|
||||
: future(f), count(c), cloned_memref_p(clone_p) {}
|
||||
} dfr_refcounted_future_t, *dfr_refcounted_future_p;
|
||||
using namespace mlir::concretelang::dfr;
|
||||
|
||||
// Ready futures are only used as inputs to tasks (never passed to
|
||||
// await_future), so we only need to track the references in task
|
||||
@@ -78,18 +72,6 @@ void _dfr_deallocate_future(void *in) {
|
||||
}
|
||||
}
|
||||
|
||||
void _dfr_deallocate_future_data(void *in) {}
|
||||
|
||||
// Determine where new task should run. For now just round-robin
|
||||
// distribution - TODO: optimise.
|
||||
static inline size_t _dfr_find_next_execution_locality() {
|
||||
static std::atomic<std::size_t> next_locality{1};
|
||||
|
||||
size_t next_loc = next_locality.fetch_add(1);
|
||||
|
||||
return next_loc % mlir::concretelang::dfr::num_nodes;
|
||||
}
|
||||
|
||||
/// Runtime generic async_task. Each first NUM_PARAMS pairs of
|
||||
/// arguments in the variadic list corresponds to a void* pointer on a
|
||||
/// hpx::future<void*> and the size of data within the future. After
|
||||
@@ -118,102 +100,57 @@ void _dfr_create_async_task(wfnptr wfn, void *ctx, size_t num_params,
|
||||
}
|
||||
va_end(args);
|
||||
|
||||
// Take a reference on each future argument
|
||||
for (auto rcf : refcounted_futures)
|
||||
((dfr_refcounted_future_p)rcf)->count.fetch_add(1);
|
||||
dfr_create_async_task_impl(wfn, ctx, refcounted_futures, param_sizes,
|
||||
param_types, outputs, output_sizes, output_types);
|
||||
}
|
||||
|
||||
// We pass functions by name - which is not strictly necessary in
|
||||
// shared memory as pointers suffice, but is needed in the
|
||||
// distributed case where the functions need to be located/loaded on
|
||||
// the node.
|
||||
auto wfnname = mlir::concretelang::dfr::_dfr_node_level_work_function_registry
|
||||
->getWorkFunctionName((void *)wfn);
|
||||
hpx::future<hpx::future<mlir::concretelang::dfr::OpaqueOutputData>> oodf;
|
||||
/// Runtime generic async_task with vector parametres. Each first
|
||||
/// NUM_OUTPUTS quadruplets of arguments in the variadic list
|
||||
/// corresponds to a size_t for the number of elements in the
|
||||
/// following array, a void * pointer on an array of
|
||||
/// hpx::future<void*> and two size_t parameters for the size and type
|
||||
/// of each output. After that come NUM_PARAMS quadruplets of
|
||||
/// arguments in the variadic list that correspond to a size_t for the
|
||||
/// number of elements in the following array, a void* pointer on an
|
||||
/// array of hpx::future<void*> and the same two size_t parametres
|
||||
/// (size and type).
|
||||
void _dfr_create_async_task_vec(wfnptr wfn, void *ctx, size_t num_params,
|
||||
size_t num_outputs, ...) {
|
||||
std::vector<void *> refcounted_futures;
|
||||
std::vector<size_t> param_sizes;
|
||||
std::vector<uint64_t> param_types;
|
||||
std::vector<void *> outputs;
|
||||
std::vector<size_t> output_sizes;
|
||||
std::vector<uint64_t> output_types;
|
||||
|
||||
// In order to allow complete dataflow semantics for
|
||||
// communication/synchronization, we split tasks in two parts: an
|
||||
// execution body that is scheduled once all input dependences are
|
||||
// satisfied, which generates a future on a tuple of outputs, which
|
||||
// is then further split into a tuple of futures and provide
|
||||
// individual synchronization for each return independently.
|
||||
mlir::concretelang::dfr::GenericComputeClient *gcc_target =
|
||||
&mlir::concretelang::dfr::gcc[_dfr_find_next_execution_locality()];
|
||||
switch (num_params) {
|
||||
|
||||
#include "concretelang/Runtime/generated/dfr_dataflow_inputs_cases.h"
|
||||
|
||||
default:
|
||||
HPX_THROW_EXCEPTION(hpx::no_success, "_dfr_create_async_task",
|
||||
"Error: number of task parameters not supported.");
|
||||
va_list args;
|
||||
va_start(args, num_outputs);
|
||||
for (size_t i = 0; i < num_outputs; ++i) {
|
||||
size_t count = va_arg(args, uint64_t);
|
||||
void **futures = va_arg(args, void **);
|
||||
size_t sizes = va_arg(args, uint64_t);
|
||||
size_t types = va_arg(args, uint64_t);
|
||||
for (size_t j = 0; j < count; ++j) {
|
||||
outputs.push_back(futures[j]);
|
||||
output_sizes.push_back(sizes);
|
||||
output_types.push_back(types);
|
||||
}
|
||||
}
|
||||
|
||||
switch (num_outputs) {
|
||||
case 1:
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(hpx::dataflow(
|
||||
[refcounted_futures](
|
||||
hpx::future<mlir::concretelang::dfr::OpaqueOutputData> oodf_in)
|
||||
-> void * {
|
||||
void *ret = oodf_in.get().outputs[0];
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return ret;
|
||||
},
|
||||
oodf)),
|
||||
1, output_types[0] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
|
||||
case 2: {
|
||||
hpx::future<hpx::tuple<void *, void *>> &&ft = hpx::dataflow(
|
||||
[refcounted_futures](
|
||||
hpx::future<mlir::concretelang::dfr::OpaqueOutputData> oodf_in)
|
||||
-> hpx::tuple<void *, void *> {
|
||||
std::vector<void *> outputs = std::move(oodf_in.get().outputs);
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return hpx::make_tuple<>(outputs[0], outputs[1]);
|
||||
},
|
||||
oodf);
|
||||
hpx::tuple<hpx::future<void *>, hpx::future<void *>> &&tf =
|
||||
hpx::split_future(std::move(ft));
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<0>(tf))), 1,
|
||||
output_types[0] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[1]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<1>(tf))), 1,
|
||||
output_types[1] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
for (size_t i = 0; i < num_params; ++i) {
|
||||
size_t count = va_arg(args, uint64_t);
|
||||
void **futures = va_arg(args, void **);
|
||||
size_t sizes = va_arg(args, uint64_t);
|
||||
size_t types = va_arg(args, uint64_t);
|
||||
for (size_t j = 0; j < count; ++j) {
|
||||
refcounted_futures.push_back(futures[j]);
|
||||
param_sizes.push_back(sizes);
|
||||
param_types.push_back(types);
|
||||
}
|
||||
}
|
||||
va_end(args);
|
||||
|
||||
case 3: {
|
||||
hpx::future<hpx::tuple<void *, void *, void *>> &&ft = hpx::dataflow(
|
||||
[refcounted_futures](
|
||||
hpx::future<mlir::concretelang::dfr::OpaqueOutputData> oodf_in)
|
||||
-> hpx::tuple<void *, void *, void *> {
|
||||
std::vector<void *> outputs = std::move(oodf_in.get().outputs);
|
||||
for (auto rcf : refcounted_futures)
|
||||
_dfr_deallocate_future(rcf);
|
||||
return hpx::make_tuple<>(outputs[0], outputs[1], outputs[2]);
|
||||
},
|
||||
oodf);
|
||||
hpx::tuple<hpx::future<void *>, hpx::future<void *>, hpx::future<void *>>
|
||||
&&tf = hpx::split_future(std::move(ft));
|
||||
*((void **)outputs[0]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<0>(tf))), 1,
|
||||
output_types[0] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[1]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<1>(tf))), 1,
|
||||
output_types[1] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
*((void **)outputs[2]) = (void *)new dfr_refcounted_future_t(
|
||||
new hpx::shared_future<void *>(std::move(hpx::get<2>(tf))), 1,
|
||||
output_types[2] == mlir::concretelang::dfr::_DFR_TASK_ARG_MEMREF);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
HPX_THROW_EXCEPTION(hpx::no_success, "_dfr_create_async_task",
|
||||
"Error: number of task outputs not supported.");
|
||||
}
|
||||
dfr_create_async_task_impl(wfn, ctx, refcounted_futures, param_sizes,
|
||||
param_types, outputs, output_sizes, output_types);
|
||||
}
|
||||
|
||||
/***************************/
|
||||
@@ -231,26 +168,23 @@ static bool use_omp_p = false;
|
||||
} // namespace
|
||||
|
||||
void _dfr_set_required(bool is_required) {
|
||||
mlir::concretelang::dfr::dfr_required_p = is_required;
|
||||
if (mlir::concretelang::dfr::dfr_required_p) {
|
||||
dfr_required_p = is_required;
|
||||
if (dfr_required_p) {
|
||||
_dfr_try_initialize();
|
||||
}
|
||||
}
|
||||
void _dfr_set_jit(bool is_jit) { mlir::concretelang::dfr::is_jit_p = is_jit; }
|
||||
void _dfr_set_use_omp(bool use_omp) {
|
||||
mlir::concretelang::dfr::use_omp_p = use_omp;
|
||||
}
|
||||
bool _dfr_is_jit() { return mlir::concretelang::dfr::is_jit_p; }
|
||||
bool _dfr_is_root_node() { return mlir::concretelang::dfr::is_root_node_p; }
|
||||
bool _dfr_use_omp() { return mlir::concretelang::dfr::use_omp_p; }
|
||||
void _dfr_set_jit(bool is_jit) { is_jit_p = is_jit; }
|
||||
void _dfr_set_use_omp(bool use_omp) { use_omp_p = use_omp; }
|
||||
bool _dfr_is_jit() { return is_jit_p; }
|
||||
bool _dfr_is_root_node() { return is_root_node_p; }
|
||||
bool _dfr_use_omp() { return use_omp_p; }
|
||||
bool _dfr_is_distributed() { return num_nodes > 1; }
|
||||
} // namespace dfr
|
||||
} // namespace concretelang
|
||||
} // namespace mlir
|
||||
|
||||
void _dfr_register_work_function(wfnptr wfn) {
|
||||
mlir::concretelang::dfr::_dfr_node_level_work_function_registry
|
||||
->getWorkFunctionName((void *)wfn);
|
||||
_dfr_node_level_work_function_registry->getWorkFunctionName((void *)wfn);
|
||||
}
|
||||
|
||||
/************************************/
|
||||
@@ -269,25 +203,25 @@ static uint64_t terminated = 2;
|
||||
} // namespace concretelang
|
||||
} // namespace mlir
|
||||
static inline void _dfr_stop_impl() {
|
||||
if (mlir::concretelang::dfr::_dfr_is_root_node())
|
||||
if (_dfr_is_root_node())
|
||||
hpx::apply([]() { hpx::finalize(); });
|
||||
hpx::stop();
|
||||
if (!mlir::concretelang::dfr::_dfr_is_root_node())
|
||||
if (!_dfr_is_root_node())
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
static inline void _dfr_start_impl(int argc, char *argv[]) {
|
||||
BEGIN_TIME(&mlir::concretelang::dfr::init_timer);
|
||||
mlir::concretelang::dfr::dl_handle = dlopen(nullptr, RTLD_NOW);
|
||||
BEGIN_TIME(&init_timer);
|
||||
dl_handle = dlopen(nullptr, RTLD_NOW);
|
||||
|
||||
// If OpenMP is to be used, we need to force its initialization
|
||||
// before thread binding occurs. Otherwise OMP threads will be bound
|
||||
// to the core of the thread initializing the OMP runtime.
|
||||
if (mlir::concretelang::dfr::_dfr_use_omp()) {
|
||||
#pragma omp parallel shared(mlir::concretelang::dfr::use_omp_p)
|
||||
if (_dfr_use_omp()) {
|
||||
#pragma omp parallel shared(use_omp_p)
|
||||
{
|
||||
#pragma omp critical
|
||||
mlir::concretelang::dfr::use_omp_p = true;
|
||||
use_omp_p = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,9 +246,9 @@ static inline void _dfr_start_impl(int argc, char *argv[]) {
|
||||
// the choices made by the OpenMP runtime if we would be mixing
|
||||
// loop & dataflow parallelism.
|
||||
char *env = getenv("OMP_NUM_THREADS");
|
||||
if (mlir::concretelang::dfr::_dfr_use_omp() && env != nullptr)
|
||||
if (_dfr_use_omp() && env != nullptr)
|
||||
nOMPThreads = strtoul(env, NULL, 10);
|
||||
else if (mlir::concretelang::dfr::_dfr_use_omp())
|
||||
else if (_dfr_use_omp())
|
||||
nOMPThreads = nCores;
|
||||
else
|
||||
nOMPThreads = 1;
|
||||
@@ -379,79 +313,66 @@ static inline void _dfr_start_impl(int argc, char *argv[]) {
|
||||
}
|
||||
|
||||
// Instantiate and initialise on each node
|
||||
mlir::concretelang::dfr::is_root_node_p =
|
||||
(hpx::find_here() == hpx::find_root_locality());
|
||||
mlir::concretelang::dfr::num_nodes = hpx::get_num_localities().get();
|
||||
is_root_node_p = (hpx::find_here() == hpx::find_root_locality());
|
||||
num_nodes = hpx::get_num_localities().get();
|
||||
|
||||
new mlir::concretelang::dfr::WorkFunctionRegistry();
|
||||
mlir::concretelang::dfr::_dfr_jit_phase_barrier = new hpx::lcos::barrier(
|
||||
"phase_barrier", mlir::concretelang::dfr::num_nodes,
|
||||
hpx::get_locality_id());
|
||||
mlir::concretelang::dfr::_dfr_startup_barrier = new hpx::lcos::barrier(
|
||||
"startup_barrier", mlir::concretelang::dfr::num_nodes,
|
||||
hpx::get_locality_id());
|
||||
new WorkFunctionRegistry();
|
||||
_dfr_jit_phase_barrier = new hpx::lcos::barrier("phase_barrier", num_nodes,
|
||||
hpx::get_locality_id());
|
||||
_dfr_startup_barrier = new hpx::lcos::barrier("startup_barrier", num_nodes,
|
||||
hpx::get_locality_id());
|
||||
|
||||
if (mlir::concretelang::dfr::_dfr_is_root_node()) {
|
||||
if (_dfr_is_root_node()) {
|
||||
// Create compute server components on each node - from the root
|
||||
// node only - and the corresponding compute client on the root
|
||||
// node.
|
||||
mlir::concretelang::dfr::gcc =
|
||||
hpx::new_<mlir::concretelang::dfr::GenericComputeClient[]>(
|
||||
hpx::default_layout(hpx::find_all_localities()),
|
||||
mlir::concretelang::dfr::num_nodes)
|
||||
.get();
|
||||
gcc = hpx::new_<GenericComputeClient[]>(
|
||||
hpx::default_layout(hpx::find_all_localities()), num_nodes)
|
||||
.get();
|
||||
}
|
||||
END_TIME(&mlir::concretelang::dfr::init_timer, "Initialization");
|
||||
END_TIME(&init_timer, "Initialization");
|
||||
}
|
||||
|
||||
/* Start/stop functions to be called from within user code (or during
|
||||
JIT invocation). These serve to pause/resume the runtime
|
||||
scheduler and to clean up used resources. */
|
||||
void _dfr_start(int64_t use_dfr_p, void *ctx) {
|
||||
BEGIN_TIME(&mlir::concretelang::dfr::whole_timer);
|
||||
BEGIN_TIME(&whole_timer);
|
||||
if (use_dfr_p) {
|
||||
// The first invocation will initialise the runtime. As each call to
|
||||
// _dfr_start is matched with _dfr_stop, if this is not hte first,
|
||||
// we need to resume the HPX runtime.
|
||||
assert(mlir::concretelang::dfr::init_guard !=
|
||||
mlir::concretelang::dfr::terminated &&
|
||||
assert(init_guard != terminated &&
|
||||
"DFR runtime: attempting to start runtime after it has been "
|
||||
"terminated");
|
||||
uint64_t expected = mlir::concretelang::dfr::uninitialised;
|
||||
if (mlir::concretelang::dfr::init_guard.compare_exchange_strong(
|
||||
expected, mlir::concretelang::dfr::active))
|
||||
uint64_t expected = uninitialised;
|
||||
if (init_guard.compare_exchange_strong(expected, active))
|
||||
_dfr_start_impl(0, nullptr);
|
||||
|
||||
assert(mlir::concretelang::dfr::init_guard ==
|
||||
mlir::concretelang::dfr::active &&
|
||||
"DFR runtime failed to initialise");
|
||||
assert(init_guard == active && "DFR runtime failed to initialise");
|
||||
|
||||
// If this is not the root node in a non-JIT execution, then this
|
||||
// node should only run the scheduler for any incoming work until
|
||||
// termination is flagged. If this is JIT, we need to run the
|
||||
// cancelled function which registers the work functions.
|
||||
if (!mlir::concretelang::dfr::_dfr_is_root_node() &&
|
||||
!mlir::concretelang::dfr::_dfr_is_jit())
|
||||
if (!_dfr_is_root_node() && !_dfr_is_jit())
|
||||
_dfr_stop_impl();
|
||||
}
|
||||
|
||||
// If DFR is used and a runtime context is needed, and execution is
|
||||
// distributed, then broadcast from root to all compute nodes.
|
||||
if (use_dfr_p && (mlir::concretelang::dfr::num_nodes > 1) &&
|
||||
(ctx || !mlir::concretelang::dfr::_dfr_is_root_node())) {
|
||||
BEGIN_TIME(&mlir::concretelang::dfr::broadcast_timer);
|
||||
new mlir::concretelang::dfr::RuntimeContextManager();
|
||||
mlir::concretelang::dfr::_dfr_node_level_runtime_context_manager
|
||||
->setContext(ctx);
|
||||
if (use_dfr_p && (num_nodes > 1) && (ctx || !_dfr_is_root_node())) {
|
||||
BEGIN_TIME(&broadcast_timer);
|
||||
new RuntimeContextManager();
|
||||
_dfr_node_level_runtime_context_manager->setContext(ctx);
|
||||
|
||||
// If this is not JIT, then the remote nodes never reach _dfr_stop,
|
||||
// so root should not instantiate this barrier.
|
||||
if (mlir::concretelang::dfr::_dfr_is_root_node() &&
|
||||
mlir::concretelang::dfr::_dfr_is_jit())
|
||||
mlir::concretelang::dfr::_dfr_startup_barrier->wait();
|
||||
END_TIME(&mlir::concretelang::dfr::broadcast_timer, "Key broadcasting");
|
||||
if (_dfr_is_root_node() && _dfr_is_jit())
|
||||
_dfr_startup_barrier->wait();
|
||||
END_TIME(&broadcast_timer, "Key broadcasting");
|
||||
}
|
||||
BEGIN_TIME(&mlir::concretelang::dfr::compute_timer);
|
||||
BEGIN_TIME(&compute_timer);
|
||||
}
|
||||
|
||||
// This function cannot be used to terminate the runtime as it is
|
||||
@@ -460,11 +381,11 @@ void _dfr_start(int64_t use_dfr_p, void *ctx) {
|
||||
// called on exit from "main" when not using the main wrapper library.
|
||||
void _dfr_stop(int64_t use_dfr_p) {
|
||||
if (use_dfr_p) {
|
||||
if (mlir::concretelang::dfr::num_nodes > 1) {
|
||||
if (num_nodes > 1) {
|
||||
// Non-root nodes synchronize here with the root to mark the point
|
||||
// where the root is free to send work out (only needed in JIT).
|
||||
if (!mlir::concretelang::dfr::_dfr_is_root_node())
|
||||
mlir::concretelang::dfr::_dfr_startup_barrier->wait();
|
||||
if (!_dfr_is_root_node())
|
||||
_dfr_startup_barrier->wait();
|
||||
|
||||
// The barrier is only needed to synchronize the different
|
||||
// computation phases when the compute nodes need to generate and
|
||||
@@ -475,41 +396,33 @@ void _dfr_stop(int64_t use_dfr_p) {
|
||||
// gain as the root node would be waiting for the end of computation
|
||||
// on all remote nodes before reaching here anyway (dataflow
|
||||
// dependences).
|
||||
if (mlir::concretelang::dfr::_dfr_is_jit()) {
|
||||
mlir::concretelang::dfr::_dfr_jit_phase_barrier->wait();
|
||||
if (_dfr_is_jit()) {
|
||||
_dfr_jit_phase_barrier->wait();
|
||||
}
|
||||
|
||||
mlir::concretelang::dfr::_dfr_node_level_runtime_context_manager
|
||||
->clearContext();
|
||||
_dfr_node_level_runtime_context_manager->clearContext();
|
||||
}
|
||||
}
|
||||
END_TIME(&mlir::concretelang::dfr::compute_timer, "Compute");
|
||||
END_TIME(&mlir::concretelang::dfr::whole_timer, "Total execution");
|
||||
END_TIME(&compute_timer, "Compute");
|
||||
END_TIME(&whole_timer, "Total execution");
|
||||
}
|
||||
|
||||
void _dfr_try_initialize() {
|
||||
// Initialize and immediately suspend the HPX runtime if not yet done.
|
||||
uint64_t expected = mlir::concretelang::dfr::uninitialised;
|
||||
if (mlir::concretelang::dfr::init_guard.compare_exchange_strong(
|
||||
expected, mlir::concretelang::dfr::active)) {
|
||||
uint64_t expected = uninitialised;
|
||||
if (init_guard.compare_exchange_strong(expected, active)) {
|
||||
_dfr_start_impl(0, nullptr);
|
||||
}
|
||||
|
||||
assert(mlir::concretelang::dfr::init_guard ==
|
||||
mlir::concretelang::dfr::active &&
|
||||
"DFR runtime failed to initialise");
|
||||
assert(init_guard == active && "DFR runtime failed to initialise");
|
||||
}
|
||||
|
||||
void _dfr_terminate() {
|
||||
uint64_t expected = mlir::concretelang::dfr::active;
|
||||
if (mlir::concretelang::dfr::init_guard.compare_exchange_strong(
|
||||
expected, mlir::concretelang::dfr::terminated))
|
||||
uint64_t expected = active;
|
||||
if (init_guard.compare_exchange_strong(expected, terminated))
|
||||
_dfr_stop_impl();
|
||||
|
||||
assert((mlir::concretelang::dfr::init_guard ==
|
||||
mlir::concretelang::dfr::terminated ||
|
||||
mlir::concretelang::dfr::init_guard ==
|
||||
mlir::concretelang::dfr::uninitialised) &&
|
||||
assert((init_guard == terminated || init_guard == uninitialised) &&
|
||||
"DFR runtime failed to terminate");
|
||||
}
|
||||
|
||||
@@ -582,12 +495,10 @@ bool _dfr_is_distributed() { return num_nodes > 1; }
|
||||
} // namespace concretelang
|
||||
} // namespace mlir
|
||||
|
||||
void _dfr_start(int64_t use_dfr_p, void *ctx) {
|
||||
BEGIN_TIME(&mlir::concretelang::dfr::compute_timer);
|
||||
}
|
||||
void _dfr_stop(int64_t use_dfr_p) {
|
||||
END_TIME(&mlir::concretelang::dfr::compute_timer, "Compute");
|
||||
}
|
||||
using namespace mlir::concretelang::dfr;
|
||||
|
||||
void _dfr_start(int64_t use_dfr_p, void *ctx) { BEGIN_TIME(&compute_timer); }
|
||||
void _dfr_stop(int64_t use_dfr_p) { END_TIME(&compute_timer, "Compute"); }
|
||||
|
||||
void _dfr_terminate() {}
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user