// Part of the Concrete Compiler Project, under the BSD3 License with Zama // Exceptions. See // https://github.com/zama-ai/concrete-compiler-internal/blob/master/LICENSE.txt // for license information. #ifndef CONCRETELANG_DFR_DISTRIBUTED_GENERIC_TASK_SERVER_HPP #define CONCRETELANG_DFR_DISTRIBUTED_GENERIC_TASK_SERVER_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "concretelang/Runtime/DFRuntime.hpp" #include "concretelang/Runtime/key_manager.hpp" extern WorkFunctionRegistry *node_level_work_function_registry; extern std::list new_allocated; using namespace hpx::naming; using namespace hpx::components; using namespace hpx::collectives; struct OpaqueInputData { OpaqueInputData() = default; OpaqueInputData(std::string wfn_name, std::vector params, std::vector param_sizes, std::vector output_sizes, bool alloc_p = false) : wfn_name(wfn_name), params(std::move(params)), param_sizes(std::move(param_sizes)), output_sizes(std::move(output_sizes)), alloc_p(alloc_p) {} OpaqueInputData(const OpaqueInputData &oid) : wfn_name(std::move(oid.wfn_name)), params(std::move(oid.params)), param_sizes(std::move(oid.param_sizes)), output_sizes(std::move(oid.output_sizes)), alloc_p(oid.alloc_p) {} friend class hpx::serialization::access; template void load(Archive &ar, const unsigned int version) { ar &wfn_name; ar ¶m_sizes; ar &output_sizes; for (auto p : param_sizes) { char *param = new char[p]; // TODO: Optimise these serialisation operations for (size_t i = 0; i < p; ++i) ar ¶m[i]; params.push_back((void *)param); } alloc_p = true; } template void save(Archive &ar, const unsigned int version) const { ar &wfn_name; ar ¶m_sizes; ar &output_sizes; for (size_t p = 0; p < params.size(); ++p) for (size_t i = 0; i < param_sizes[p]; ++i) ar &static_cast(params[p])[i]; } HPX_SERIALIZATION_SPLIT_MEMBER() std::string wfn_name; std::vector params; std::vector param_sizes; std::vector output_sizes; bool alloc_p = false; }; struct OpaqueOutputData { OpaqueOutputData() = default; OpaqueOutputData(std::vector outputs, std::vector output_sizes, bool alloc_p = false) : outputs(std::move(outputs)), output_sizes(std::move(output_sizes)), alloc_p(alloc_p) {} OpaqueOutputData(const OpaqueOutputData &ood) : outputs(std::move(ood.outputs)), output_sizes(std::move(ood.output_sizes)), alloc_p(ood.alloc_p) {} friend class hpx::serialization::access; template void load(Archive &ar, const unsigned int version) { ar &output_sizes; for (auto p : output_sizes) { char *output = new char[p]; for (size_t i = 0; i < p; ++i) ar &output[i]; outputs.push_back((void *)output); new_allocated.push_back((void *)output); } alloc_p = true; } template void save(Archive &ar, const unsigned int version) const { ar &output_sizes; for (size_t p = 0; p < outputs.size(); ++p) { for (size_t i = 0; i < output_sizes[p]; ++i) ar &static_cast(outputs[p])[i]; // TODO: investigate if HPX is automatically deallocating // these. Here it could be safely assumed that these would no // longer be live. // delete (char*)outputs[p]; } } HPX_SERIALIZATION_SPLIT_MEMBER() std::vector outputs; std::vector output_sizes; bool alloc_p = false; }; struct GenericComputeServer : component_base { GenericComputeServer() = default; // Component actions exposed OpaqueOutputData execute_task(const OpaqueInputData &inputs) { auto wfn = node_level_work_function_registry->getWorkFunctionPointer( inputs.wfn_name); std::vector outputs; switch (inputs.output_sizes.size()) { case 1: { void *output = (void *)(new char[inputs.output_sizes[0]]); switch (inputs.params.size()) { case 0: wfn(output); break; case 1: wfn(inputs.params[0], output); break; case 2: wfn(inputs.params[0], inputs.params[1], output); break; case 3: wfn(inputs.params[0], inputs.params[1], inputs.params[2], output); break; default: HPX_THROW_EXCEPTION(hpx::no_success, "GenericComputeServer::execute_task", "Error: number of task parameters not supported."); } outputs = {output}; new_allocated.push_back(output); break; } case 2: { void *output1 = (void *)(new char[inputs.output_sizes[0]]); void *output2 = (void *)(new char[inputs.output_sizes[1]]); switch (inputs.params.size()) { case 0: wfn(output1, output2); break; case 1: wfn(inputs.params[0], output1, output2); break; case 2: wfn(inputs.params[0], inputs.params[1], output1, output2); break; case 3: wfn(inputs.params[0], inputs.params[1], inputs.params[2], output1, output2); break; default: HPX_THROW_EXCEPTION(hpx::no_success, "GenericComputeServer::execute_task", "Error: number of task parameters not supported."); } outputs = {output1, output2}; new_allocated.push_back(output1); new_allocated.push_back(output2); break; } case 3: { void *output1 = (void *)(new char[inputs.output_sizes[0]]); void *output2 = (void *)(new char[inputs.output_sizes[1]]); void *output3 = (void *)(new char[inputs.output_sizes[2]]); switch (inputs.params.size()) { case 0: wfn(output1, output2, output3); break; case 1: wfn(inputs.params[0], output1, output2, output3); break; case 2: wfn(inputs.params[0], inputs.params[1], output1, output2, output3); break; case 3: wfn(inputs.params[0], inputs.params[1], inputs.params[2], output1, output2, output3); break; default: HPX_THROW_EXCEPTION(hpx::no_success, "GenericComputeServer::execute_task", "Error: number of task parameters not supported."); } outputs = {output1, output2, output3}; new_allocated.push_back(output1); new_allocated.push_back(output2); new_allocated.push_back(output3); break; } default: HPX_THROW_EXCEPTION(hpx::no_success, "GenericComputeServer::execute_task", "Error: number of task outputs not supported."); } if (inputs.alloc_p) for (auto p : inputs.params) delete ((char *)p); return OpaqueOutputData(std::move(outputs), std::move(inputs.output_sizes), inputs.alloc_p); } HPX_DEFINE_COMPONENT_ACTION(GenericComputeServer, execute_task); }; HPX_REGISTER_ACTION_DECLARATION(GenericComputeServer::execute_task_action, GenericComputeServer_execute_task_action) HPX_REGISTER_COMPONENT_MODULE() HPX_REGISTER_COMPONENT(hpx::components::component, GenericComputeServer) HPX_REGISTER_ACTION(GenericComputeServer::execute_task_action, GenericComputeServer_execute_task_action) struct GenericComputeClient : client_base { typedef client_base base_type; GenericComputeClient() = default; GenericComputeClient(id_type id) : base_type(std::move(id)) {} hpx::future execute_task(const OpaqueInputData &inputs) { typedef GenericComputeServer::execute_task_action action_type; return hpx::async(this->get_id(), inputs); } }; #endif