Phase 1 is multithreaded and works. Started writing multithreaded version of phase 2.

This commit is contained in:
Koren-Brand
2024-07-29 14:49:40 +03:00
parent 5c75b6dc35
commit 5c8cbfc83d
5 changed files with 425 additions and 157 deletions

7
.gitignore vendored
View File

@@ -17,4 +17,9 @@
**/Cargo.lock
**/icicle/build/
**/wrappers/rust/icicle-cuda-runtime/src/bindings.rs
**/build/*
**/build*
# TODO remove bellow when development of cpu msm ends
icicle_v3/backend/cpu/src/curve/Makefile
icicle_v3/backend/cpu/src/curve/*.o
icicle_v3/backend/cpu/src/curve/*.txt
icicle_v3/backend/cpu/src/curve/msm_test

View File

@@ -9,6 +9,8 @@
#include <fstream>
#include <cassert>
#include <iostream>
#include <cmath>
#include <algorithm>
#include <atomic>
@@ -95,8 +97,8 @@ Point* msm_bucket_accumulator(
if (negate_p_and_s) scalar = sca_test::neg(scalar);
for (int j = 0; j < precomp_f; j++)
{
aff_test point = is_b_mont? aff_test::from_montgomery(bases[msm_size*j + i]) :
bases[msm_size*j + i];
aff_test point = is_b_mont? aff_test::from_montgomery(bases[precomp_f*i+j]) :
bases[precomp_f*i+j];
if (negate_p_and_s) point = aff_test::neg(point);
for (int k = 0; k < num_bms; k++)
{
@@ -112,17 +114,17 @@ Point* msm_bucket_accumulator(
#ifdef DEBUG_PRINTS
if (Point::is_zero(bkts[bkt_idx]))
{
trace_f << bkt_idx << ":\tWrite free cell:\t" << point.x << '\n';
trace_f << '#' << bkt_idx << ":\tWrite free cell:\t" << point.x << '\n';
}
else
{
trace_f << bkt_idx << ":\tRead for addition:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << point.x << " = " << Point::to_affine(bkts[bkt_idx] + point).x << ")\n";
trace_f << bkt_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(bkts[bkt_idx] + point).x << '\n';
trace_f << '#' << bkt_idx << ":\tRead for addition:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << point.x << " = " << Point::to_affine(bkts[bkt_idx] + point).x << ")\n";
trace_f << '#' << bkt_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(bkts[bkt_idx] + point).x << '\n';
} // TODO remove double addition
#endif
bkts[num_bkts*k + curr_coeff] = Point::is_zero(bkts[num_bkts*k + curr_coeff])? Point::from_affine(point) :
bkts[num_bkts*k + curr_coeff] + point;
bkts[num_bkts*k + curr_coeff] + point; // TODO change here order of precomp
carry = 0;
}
else
@@ -131,12 +133,12 @@ Point* msm_bucket_accumulator(
#ifdef DEBUG_PRINTS
if (Point::is_zero(bkts[bkt_idx]))
{
trace_f << bkt_idx << ":\tWrite free cell:\t" << aff_test::neg(point).x << '\n';
trace_f << '#' << bkt_idx << ":\tWrite free cell:\t" << aff_test::neg(point).x << '\n';
}
else
{
trace_f << bkt_idx << ":\tRead for subtraction:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << point.x << " = " << Point::to_affine(bkts[bkt_idx] - point).x << ")\n";
trace_f << bkt_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(bkts[bkt_idx] - point).x << '\n';
trace_f << '#' << bkt_idx << ":\tRead for subtraction:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << point.x << " = " << Point::to_affine(bkts[bkt_idx] - point).x << ")\n";
trace_f << '#' << bkt_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(bkts[bkt_idx] - point).x << '\n';
} // TODO remove double addition
#endif
@@ -194,6 +196,7 @@ Point* msm_bm_sum(
if (!Point::is_zero(partial_sum)) bm_sums[k] = bm_sums[k] + partial_sum;
}
}
return bm_sums;
}
@@ -283,7 +286,7 @@ eIcicleError cpu_msm_single_thread(
template<typename Point>
ThreadTask<Point>::ThreadTask()
: bkt_idx(-1),
: return_idx(-1),
p1(Point::zero()),
p2(Point::zero()),
result(Point::zero())
@@ -291,8 +294,8 @@ ThreadTask<Point>::ThreadTask()
}
template<typename Point>
ThreadTask<Point>::ThreadTask(const ThreadTask<Point>& other)
: bkt_idx(-1),
ThreadTask<Point>::ThreadTask(const ThreadTask<Point>& other) // TODO delete when changing to task array instead of vector
: return_idx(-1),
p1(Point::zero()),
p2(Point::zero()),
result(Point::zero())
@@ -300,62 +303,96 @@ ThreadTask<Point>::ThreadTask(const ThreadTask<Point>& other)
}
template<typename Point>
inline void ThreadTask<Point>::run()
void ThreadTask<Point>::run(int tid, std::vector<int>& idle_idxs, bool& kill_thread)
{
if (in_ready.load(std::memory_order_acquire))
bool rdy_status = in_ready.load(std::memory_order_acquire);
if (!rdy_status) idle_idxs.push_back(pidx); // TODO remove when finishing debugging
while(!rdy_status)
{
// assert(!out_done.load(std::memory_order_acquire) );
in_ready.store(false, std::memory_order_relaxed);
result = p1 + p2;
out_done.store(true, std::memory_order_release);
std::this_thread::sleep_for(std::chrono::microseconds(10));
rdy_status = in_ready.load(std::memory_order_acquire);
if (kill_thread) return;
}
in_ready.store(false, std::memory_order_release);
result = p1 + p2;
out_done.store(true, std::memory_order_release);
}
template<typename Point>
inline void ThreadTask<Point>::new_task(int in_idx, const Point& in_p1, const Point& in_p2)
void ThreadTask<Point>::new_task(const int in_idx, const Point& in_p1, const Point& in_p2)
{
out_done.store(false, std::memory_order_release);
bkt_idx = in_idx;
return_idx = in_idx;
p1 = in_p1; // Copied by value to not be linked to the original bucket
p2 = in_p2;
in_ready.store(true, std::memory_order_release);
}
template<typename Point>
inline void ThreadTask<Point>::chain_task(const Point in_p2)
void ThreadTask<Point>::chain_task(const Point in_p2)
{
// std::unique_lock<std::mutex> temp_lock(idle_mtx);
out_done.store(false, std::memory_order_release);
p2 = in_p2;
p1 = result;
in_ready.store(true, std::memory_order_release);
// idle.notify_one();
}
template<typename Point>
void ec_add_thread(int tid, std::vector<ThreadTask<Point>>& tasks, bool& kill_thread)
WorkThread<Point>::~WorkThread()
{
/**
* Working function of the work threads -
* handles EC addition if given valid inputs in one of the threads interfaces
* @param com_a - thread interface (see struct definition above)
* @param com_b - second thread interface
* @param kill_thread - flag to finish the function (and the thread)
*/
while (true)
kill_thread = true;
thread.join();
}
template<typename Point>
void WorkThread<Point>::thread_setup(const int tid, const int task_per_thread)
{
this->tid = tid;
for (int j = 0; j < task_per_thread; j++) tasks.push_back(ThreadTask<Point>()); // TODO change to array
thread = std::thread(&WorkThread<Point>::add_ec_tasks, this, std::ref(kill_thread)); // TODO kill_thread is accessible from this
}
template<typename Point>
void WorkThread<Point>::add_ec_tasks(bool& kill_thread)
{
while (!kill_thread)
{
int i = 0;
for (ThreadTask<Point>& task : tasks)
{
task.run();
task.run(tid, idle_idxs, kill_thread);
#ifdef DEBUG_PRINTS
if (tid == 2) std::cout << i << ", bkt_idx=" << tasks[task_round_robin].return_idx << "\tDone\n";
i++;
#endif
}
if (kill_thread) break;
}
}
template<typename Point>
Point int_mult(Point p, int x)
{
if (Point::is_zero(p) || x==0) return Point::zero();
Point result = Point::zero();
if (x & 1) result = p;
x >>= 1;
while (x > 0)
{
p = p + p;
if (x & 1) result = result + p;
x >>= 1;
}
return result;
}
template <typename Point>
Msm<Point>::Msm(const MSMConfig& config)
: n_threads(config.ext.get<int>("n_threads")),
tasks_per_thread(config.ext.get<int>("tasks_per_thread")), // TODO add tp config?
// tasks_per_thread(config.ext.get<int>("tasks_per_thread")), // TODO add tp config?
c(config.ext.get<int>("c")), // TODO calculate instead of param
num_bkts(1 << (c - 1)),
precomp_f(config.precompute_factor),
num_bms(((sca_test::NBITS - 1) / (config.precompute_factor * c)) + 1),
are_scalars_mont(config.are_scalars_montgomery_form),
@@ -364,19 +401,30 @@ Msm<Point>::Msm(const MSMConfig& config)
#ifdef DEBUG_PRINTS
trace_f("trace_bucket_multi.txt"), // TODO delete
#endif
thread_round_robin(0)
thread_round_robin(0),
num_bkts(1 << (c - 1)),
log_num_segments(std::max((int)std::ceil(std::log2(num_bms / n_threads)), 0)),
num_bm_segments(1 << log_num_segments),
segment_size(num_bkts >> log_num_segments),
tasks_per_thread(std::max(((int)(num_bms / n_threads)) << (log_num_segments+1), 2))
{
// Phase 1
bkts = new Point[num_bms*num_bkts];
// std::fill_n(bkts, num_bkts*num_bms, Point::zero()); // TODO remove as occupancy removes the need of initial value
std::fill_n(bkts, num_bkts*num_bms, Point::zero()); // TODO remove as occupancy removes the need of initial value
bkts_occupancy = new bool[num_bms*num_bkts];
std::fill_n(bkts_occupancy, num_bkts*num_bms, false);
// Phase 2
phase2_sums = new Point[num_bms * num_bm_segments * 2]; // Both triangle and line sum one after the other for each segment
task_assigned_to_sum = new std::tuple<int,int>[num_bms * num_bm_segments * 2];
// std::fill_n(task_assigned_to_sum, std::make_tuple(-1, -1));
for (int i = 0; i < num_bms * num_bm_segments * 2; i++) task_assigned_to_sum[i] = std::make_tuple(1,1);
bm_sums = new Point[num_bms];
threads = new WorkThread<Point>[n_threads];
for (int i = 0; i < n_threads; i++)
{
threads[i].tid = i;
for (int j = 0; j < tasks_per_thread; j++) threads[i].tasks.push_back(ThreadTask<Point>());
threads[i].thread = std::thread(ec_add_thread<Point>, threads[i].tid, std::ref(threads[i].tasks), std::ref(kill_threads));
}
for (int i = 0; i < n_threads; i++) threads[i].thread_setup(i, tasks_per_thread);
#ifdef DEBUG_PRINTS
if (!trace_f.good()) { throw std::invalid_argument("Can't open file"); } // TODO remove log
#endif
@@ -385,15 +433,17 @@ Msm<Point>::Msm(const MSMConfig& config)
template <typename Point>
Msm<Point>::~Msm()
{
// std::cout << "\n\nDestroying msm object at the end of the run\n\n"; // COMMENT why am I not seeing it one the console? Isn't the destructor automatically called when msm goes out of scope?
kill_threads = true;
for (int i = 0; i < n_threads; i++) threads[i].thread.join();
std::cout << "\n\nDestroying msm object at the end of the run\n\n"; // COMMENT why am I not seeing it one the console? Isn't the destructor automatically called when msm goes out of scope?
kill_threads = true; // TODO check if it is used after kill_thread has been added to WorkThread destructor
// for (int i = 0; i < n_threads; i++) threads[i].thread.join();
#ifdef DEBUG_PRINTS
trace_f.close();
#endif
delete[] threads;
delete[] bkts;
delete[] bkts_occupancy;
std::cout << "Loops without a free thread:\t" << loop_count << '\n';
}
template<typename Point>
@@ -420,52 +470,82 @@ void Msm<Point>::wait_for_idle()
* @return boolean, a flag indicating all threads are idle
*/
bool all_threads_idle = false;
int count = 0;
while (!all_threads_idle)
{
all_threads_idle = true;
for (int i = 0; i < n_threads; i++)
{
int task_idx = 0;
for (ThreadTask<Point>& task : threads[i].tasks)
int og_task_round_robin = threads[i].task_round_robin;
#ifdef DEBUG_PRINTS
trace_f << "Finishing thread " << i << ", starting at task: " << og_task_round_robin << '\n';
#endif
for (int j = og_task_round_robin; j < og_task_round_robin + tasks_per_thread; j++)
{
int task_idx = j;
if (task_idx >= tasks_per_thread) task_idx -= tasks_per_thread;
// For readability
ThreadTask<Point>& task = threads[i].tasks[task_idx];
if (task.out_done.load(std::memory_order_acquire))
{
if (task.bkt_idx >= 0)
if (task.return_idx >= 0)
{
if (bkts_occupancy[task.bkt_idx])
if (bkts_occupancy[task.return_idx])
{
#ifdef DEBUG_PRINTS
trace_f << task.bkt_idx << ":\tFCollision addition - bkts' cell:\t" << Point::to_affine(bkts[task.bkt_idx]).x << "\t(With add res point:\t" << Point::to_affine(task.result).x << " = " << Point::to_affine(bkts[task.bkt_idx] + task.result).x << ")\t(" << i << ',' << task_idx << ")\n";
trace_f << '#' << task.return_idx << ":\tFCollision addition - bkts' cell:\t" << Point::to_affine(bkts[task.return_idx]).x << "\t(With add res point:\t" << Point::to_affine(task.result).x << " = " << Point::to_affine(bkts[task.return_idx] + task.result).x << ")\t(" << i << ',' << task_idx << ")\n";
#endif
bkts_occupancy[task.bkt_idx] = false;
task.chain_task(bkts[task.bkt_idx]);
all_threads_idle = false;
// std::cout << "\n" << i << ":\t(" << task_idx << "->" << threads[i].task_round_robin << ")\tChaining\n\n";
bkts_occupancy[task.return_idx] = false;
int bkt_idx = task.return_idx;
task.return_idx = -1;
threads[i].tasks[threads[i].task_round_robin].new_task(bkt_idx, task.result, bkts[bkt_idx]);
if (threads[i].task_round_robin == tasks_per_thread - 1) threads[i].task_round_robin = 0;
else threads[i].task_round_robin++;
all_threads_idle = false; // This thread isn't idle due to the newly assigned task
}
else
{
bkts[task.bkt_idx] = task.result;
bkts[task.return_idx] = task.result;
#ifdef DEBUG_PRINTS
trace_f << task.bkt_idx << ":\tFWrite (res) free cell:\t" << Point::to_affine(bkts[task.bkt_idx]).x << "\t(" << i << ',' << task_idx << ")\n";
trace_f << '#' << task.return_idx << ":\tFWrite (res) free cell:\t" << Point::to_affine(bkts[task.return_idx]).x << "\t(" << i << ',' << task_idx << ")\n";
#endif
bkts_occupancy[task.bkt_idx] = true;
task.bkt_idx = -1; // To ensure no repeated handling of outputs
bkts_occupancy[task.return_idx] = true;
task.return_idx = -1; // To ensure no repeated handling of outputs
}
}
else
{
#ifdef DEBUG_PRINTS
trace_f << "Task " << task_idx << " idle\n";
#endif
}
}
else { all_threads_idle = false; break;
#ifdef DEBUG_PRINTS
trace_f << '#' << task.return_idx << ":\t(" << i << ',' << task_idx << ") not done\tres=" << task.result << "\tstatus:" << task.in_ready << ',' << task.out_done << '\n';
#endif
}
else all_threads_idle = false;
task_idx++;
}
}
// std::this_thread::sleep_for(std::chrono::milliseconds(5000));
}
// trace_f.flush();
}
template<typename Point>
template<typename Base>
void Msm<Point>::push_addition(
void Msm<Point>::phase1_push_addition(
const unsigned int task_bkt_idx,
const Point bkt,
const Base& base
const Base& base,
int pidx
) // TODO add option of adding different types
{
/**
@@ -474,54 +554,68 @@ void Msm<Point>::push_addition(
* @param bkt - bkt to be added. it is passed by value to allow the appropriate cell in the bucket array to be "free" an overwritten without affecting the working thread
* @param p2 - point to be added
*/
int count_thread_iters = 0;
bool assigned_to_thread = false;
int task_idx = 0;
while (!assigned_to_thread)
{
task_idx = 0;
for (ThreadTask<Point>& task : threads[thread_round_robin].tasks)
// For readability
ThreadTask<Point>& task = threads[thread_round_robin].tasks[threads[thread_round_robin].task_round_robin];
if (task.out_done.load(std::memory_order_acquire))
{
if (task.out_done.load(std::memory_order_acquire))
num_additions++;
if (task.return_idx >= 0)
{
if (task.bkt_idx >= 0)
if (bkts_occupancy[task.return_idx])
{
if (bkts_occupancy[task.bkt_idx])
{
#ifdef DEBUG_PRINTS
trace_f << task.bkt_idx << ":\tCollision addition - bkts' cell:\t" << Point::to_affine(bkts[task.bkt_idx]).x << "\t(With add res point:\t" << Point::to_affine(task.result).x << " = " << Point::to_affine(bkts[task.bkt_idx] + task.result).x << ")\t(" << thread_round_robin << ',' << task_idx << ")\n";
#endif
bkts_occupancy[task.bkt_idx] = false;
task.chain_task(bkts[task.bkt_idx]);
}
else
{
#ifdef DEBUG_PRINTS
trace_f << task.bkt_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(task.result).x << "\t(" << thread_round_robin << ',' << task_idx << ")\n";
#endif
bkts[task.bkt_idx] = task.result;
bkts_occupancy[task.bkt_idx] = true;
task.new_task(task_bkt_idx, bkt, Point::from_affine(base)); // TODO support multiple types
assigned_to_thread = true;
break;
}
#ifdef DEBUG_PRINTS
trace_f << '#' << task.return_idx << ":\tCollision addition - bkts' cell:\t" << Point::to_affine(bkts[task.return_idx]).x << "\t(With add res point:\t" << Point::to_affine(task.result).x << " = " << Point::to_affine(bkts[task.return_idx] + task.result).x << ")\t(" << thread_round_robin << ',' << threads[thread_round_robin].task_round_robin<< ")\n";
#endif
bkts_occupancy[task.return_idx] = false;;
task.pidx = pidx;
task.chain_task(bkts[task.return_idx]);
}
else
{
task.new_task(task_bkt_idx, bkt, Point::from_affine(base)); // TODO support multiple types
#ifdef DEBUG_PRINTS
trace_f << '#' << task.return_idx << ":\tWrite (res) free cell:\t" << Point::to_affine(task.result).x << "\t(" << thread_round_robin << ',' << threads[thread_round_robin].task_round_robin << ")\n";
#endif
bkts[task.return_idx] = task.result;
bkts_occupancy[task.return_idx] = true;
task.pidx = pidx;
task.new_task(task_bkt_idx, bkt, Point::from_affine(base)); // TODO support multiple types
assigned_to_thread = true;
break;
#ifdef DEBUG_PRINTS
trace_f << '#' << task_bkt_idx << ":\tAssigned to:\t(" << thread_round_robin << ',' << threads[thread_round_robin].task_round_robin << ")\n";
// trace_f.flush();
#endif
// break;
}
}
task_idx++;
else
{
task.pidx = pidx;
task.new_task(task_bkt_idx, bkt, Point::from_affine(base)); // TODO support multiple types
assigned_to_thread = true;
#ifdef DEBUG_PRINTS
trace_f << '#' << task_bkt_idx << ":\tAssigned to:\t(" << thread_round_robin << ',' << threads[thread_round_robin].task_round_robin << ")\n";
// trace_f.flush();
#endif
// break;
}
if (threads[thread_round_robin].task_round_robin == tasks_per_thread - 1) threads[thread_round_robin].task_round_robin = 0;
else threads[thread_round_robin].task_round_robin++;
}
// Move to next thread after checking all current thread's tasks
if (thread_round_robin == n_threads - 1) thread_round_robin = 0;
else thread_round_robin++;
count_thread_iters++;
if (count_thread_iters == n_threads)
{
count_thread_iters = 0;
loop_count++;
}
}
#ifdef DEBUG_PRINTS
trace_f << task_bkt_idx << ":\tAssigned to:\t(" << (thread_round_robin-1) << ',' << (task_idx-1) << ")\n";
#endif
}
@@ -545,7 +639,8 @@ Point* Msm<Point>::bucket_accumulator(
const int num_windows_m1 = (sca_test::NBITS -1) / c; // +1 for ceiling than -1 for m1
int carry = 0;
std::cout << "\n\nc=" << c << "\tpcf=" << precomp_f << "\tnum bms=" << num_bms << "\n\n\n";
std::cout << "\n\nc=" << c << "\tpcf=" << precomp_f << "\tnum bms=" << num_bms << "\tntrds,tasks=" << n_threads << ',' << tasks_per_thread << "\n\n\n";
std::cout << log_num_segments << '\n' << segment_size << "\n\n\n";
for (int i = 0; i < msm_size; i++)
{
carry = 0;
@@ -555,8 +650,8 @@ Point* Msm<Point>::bucket_accumulator(
if (negate_p_and_s) scalar = sca_test::neg(scalar);
for (int j = 0; j < precomp_f; j++)
{
aff_test point = are_points_mont? aff_test::from_montgomery(bases[msm_size*j + i]) :
bases[msm_size*j + i];
aff_test point = are_points_mont? aff_test::from_montgomery(bases[precomp_f*i+j]) :
bases[precomp_f*i+j]; // TODO change here order of precomp
if (negate_p_and_s) point = aff_test::neg(point);
for (int k = 0; k < num_bms; k++)
{
@@ -580,16 +675,17 @@ Point* Msm<Point>::bucket_accumulator(
{
bkts_occupancy[bkt_idx] = false;
#ifdef DEBUG_PRINTS
trace_f << bkt_idx << ":\tRead for addition:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << (carry > 0? aff_test::neg(point) : point).x << " = " << Point::to_affine(bkts[bkt_idx] + (carry > 0? aff_test::neg(point) : point)).x << ")\n";
trace_f << '#' << bkt_idx << ":\tRead for addition:\t" << Point::to_affine(bkts[bkt_idx]).x << "\t(With new point:\t" << (carry > 0? aff_test::neg(point) : point).x << " = " << Point::to_affine(bkts[bkt_idx] + (carry > 0? aff_test::neg(point) : point)).x << ")\n";
// trace_f.flush();
#endif
push_addition<aff_test>(bkt_idx, bkts[bkt_idx], carry > 0? aff_test::neg(point) : point);
phase1_push_addition<aff_test>(bkt_idx, bkts[bkt_idx], carry > 0? aff_test::neg(point) : point, i);
}
else
{
bkts_occupancy[bkt_idx] = true;
bkts[bkt_idx] = carry > 0? Point::neg(Point::from_affine(point)) : Point::from_affine(point);
#ifdef DEBUG_PRINTS
trace_f << bkt_idx << ":\tWrite free cell:\t" << (carry > 0? aff_test::neg(point) : point).x << '\n';
trace_f << '#' << bkt_idx << ":\tWrite free cell:\t" << (carry > 0? aff_test::neg(point) : point).x << '\n';
#endif
}
}
@@ -597,41 +693,66 @@ Point* Msm<Point>::bucket_accumulator(
}
}
}
std::cout << "Wait for idle\n";
wait_for_idle();
// bkt_file();
#ifdef DEBUG_PRINTS
bkt_file();
#endif
return bkts;
}
template<typename Point>
std::tuple<int, int> Msm<Point>::phase2_push_addition(
const unsigned int task_bkt_idx,
const Point& bkt,
const Point& base
)
{
return std::make_tuple(-1,-1);
}
template <typename Point>
Point* Msm<Point>::bm_sum(
Point* bkts,
const unsigned int c,
const unsigned int num_bms)
Point* Msm<Point>::bm_sum()
{
/**
* Sum bucket modules to one point each
* @param bkts - point array containing all bkts ordered by bucket module
* @param c - bucket width
* @param num_bms - number of bucket modules
* @return bm_sums - point array containing the bucket modules' sums
*/
auto t = Timer("P2:bucket-module-sum");
uint32_t num_bkts = 1<<(c-1); // NOTE implicitly assuming that c<32
Point* bm_sums = new Point[num_bms];
for (int k = 0; k < num_bms; k++)
// Init values of partial (line) and total (triangle) sum
for (int i = 0; i < num_bms; i++)
{
bm_sums[k] = bkts_occupancy[num_bkts*k]? Point::copy(bkts[num_bkts*k]) : Point::zero(); // Start with bucket zero that holds the weight <num_bkts>
Point partial_sum = bkts_occupancy[num_bkts*k]? Point::copy(bkts[num_bkts*k]) : Point::zero();
for (int i = num_bkts-1; i > 0; i--)
for (int j = 0; j < num_bm_segments-1; j++)
{
if (bkts_occupancy[num_bkts*k + i] && !Point::is_zero(bkts[num_bkts*k + i])) partial_sum = partial_sum + bkts[num_bkts*k +i];
if (!Point::is_zero(partial_sum)) bm_sums[k] = bm_sums[k] + partial_sum;
phase2_sums[num_bm_segments*i + 2*j] = bkts[num_bkts*i + segment_size*(j+1)]; // +1 because the sum starts from the most significant element of the segment
phase2_sums[num_bm_segments*i + 2*j+1] = bkts[num_bkts*i + segment_size*(j+1)];
}
phase2_sums[num_bm_segments*(i+1) - 2] = bkts[num_bkts*i]; // The most significant bucket of every bm is stored in address 0
phase2_sums[num_bm_segments*(i+1) - 1] = bkts[num_bkts*i];
}
for (int k = segment_size-1; k > 0; k--)
{
for (int i = 0; i < num_bms; i++)
{
for (int j = 0; j < num_bm_segments; j++)
{
// For readability
int triangle_sum_idx = num_bm_segments*i + 2*j;
int line_sum_idx = num_bm_segments*i + 2*j + 1;
int bkt_idx = num_bkts*i + segment_size*j + k;
int assigned_thread = std::get<0>(task_assigned_to_sum[line_sum_idx]);
int assigned_task = std::get<1>(task_assigned_to_sum[line_sum_idx]);
if (assigned_thread >= 0)
{
while (!threads[assigned_thread].task[assigned_task].out_done.load(std::memory_order_acquire)) { } // TODO add sleep
// task_assigned_to_sum[]
}
phase2_push_addition(line_sum_idx, phase2_sums[line_sum_idx], bkts[bkt_idx]);
}
}
}
return bm_sums;
}
template <typename Point>
@@ -657,7 +778,9 @@ eIcicleError cpu_msm(
if (config.ext.get<int>("n_threads") <= 0) { return eIcicleError::INVALID_ARGUMENT; }
Point* bkts = msm->bucket_accumulator(scalars, bases, msm_size);
Point* bm_sums = msm->bm_sum(bkts, c, num_bms);
// Point* bm_sums = msm->bm_sum(bkts, c, num_bms);
Point* bm_sums = msm_bm_sum<Point>(bkts, c, num_bms);
Point res = msm_final_sum<Point>(bm_sums, c, num_bms, config.are_points_montgomery_form);
results[0] = res;
delete[] bm_sums;
@@ -676,13 +799,15 @@ eIcicleError cpu_msm_ref(
const MSMConfig& config,
Point* results)
{
// std::cout << "\n\nYuval's reference\n\n";
const unsigned int precomp_f = config.precompute_factor;
Point res = Point::zero();
for (auto i = 0; i < msm_size; ++i) {
sca_test scalar = config.are_scalars_montgomery_form? sca_test::from_montgomery(scalars[i]) :
scalars[i];
aff_test point = config.are_points_montgomery_form? aff_test::from_montgomery(bases[i]) :
bases[i];
aff_test point = config.are_points_montgomery_form? aff_test::from_montgomery(bases[precomp_f*i]) :
bases[precomp_f*i];
// std::cout << scalar << "\t*\t" << point << '\n';
res = res + scalar * Point::from_affine(point);
}
// results[0] = config.are_points_montgomery_form? Point::to_montgomery(res) : res;
@@ -704,17 +829,22 @@ eIcicleError cpu_msm_precompute_bases(
const unsigned int c = config.ext.get<int>("c");
const unsigned int num_bms_no_precomp = (sca_test::NBITS - 1) / c + 1;
const unsigned int shift = c * ((num_bms_no_precomp - 1) / precompute_factor + 1);
// std::cout << "Starting precompute\n";
// std::cout << c << ',' << shift << '\n';
for (int i = 0; i < nof_bases; i++)
{
output_bases[i] = input_bases[i]; // COMMENT Should I copy? (not by reference)
output_bases[precompute_factor*i] = input_bases[i]; // COMMENT Should I copy? (not by reference)
proj_test point = proj_test::from_affine(is_mont? A::from_montgomery(input_bases[i]) : input_bases[i]);
// std::cout << "OG point[" << i << "]:\t" << point << '\n';
for (int j = 1; j < precompute_factor; j++)
{
for (int k = 0; k < shift; k++)
{
point = proj_test::dbl(point);
// std::cout << point << '\n';
}
output_bases[nof_bases*j + i] = is_mont? A::to_montgomery(proj_test::to_affine(point)) : proj_test::to_affine(point);
// std::cout << "Shifted point[" << i << "]:\t" << point << "\nStored in index=" << (precompute_factor*i+j) << '\n';
output_bases[precompute_factor*i + j] = is_mont? A::to_montgomery(proj_test::to_affine(point)) : proj_test::to_affine(point); // TODO change here order of precomp
}
}
return eIcicleError::SUCCESS;
@@ -770,8 +900,9 @@ REGISTER_MSM_BACKEND("CPU_REF", cpu_msm_single_thread<proj_test>);
int main() {
int seed = 0;
auto t = Timer("Time till failure");
while (true){
const int logn = 17;
const int logn = 5;
const int N = 1 << logn;
auto scalars = std::make_unique<sca_test[]>(N);
auto bases = std::make_unique<aff_test[]>(N);
@@ -789,12 +920,16 @@ int main() {
proj_test result{};
auto run = [&](const char* dev_type, proj_test* result, const char* msg, bool measure, int iters,auto cpu_msm) {
const int log_p = 2;
const int log_p = 3;
const int c = std::max(logn, 8) - 1;
const int pcf = 1 << log_p;
int hw_threads = std::thread::hardware_concurrency();
if (hw_threads <= 0) { std::cout << "Unable to detect number of hardware supported threads - fixing it to 1\n"; }
// const int n_threads = (hw_threads > 1)? hw_threads-2 : 1;
const int n_threads = 8;
const int tasks_per_thread = 2;
const int tasks_per_thread = 4;
auto config = default_msm_config();
config.ext.set("c", c);
@@ -826,7 +961,8 @@ int main() {
std::cout << proj_test::to_affine(result_cpu_ref) << std::endl;
std::cout << "Seed is: " << seed << '\n';
assert(result_cpu==result_cpu_ref);
std::cout << "HERE\n";
}
return 0;

View File

@@ -2,9 +2,13 @@
#define CPU_MSM
// #define STANDALONE
#include <atomic>
#include <mutex>
#include <tuple>
#include <unistd.h> // TODO remove
#include "icicle/errors.h"
#include <atomic>
#include "icicle/config_extension.h"
using namespace icicle;
#ifndef STANDALONE
@@ -211,40 +215,57 @@ using namespace icicle;
template<typename Point> // TODO add support for two different point types
class ThreadTask
{
{ // TODO turn to class and check which members can be private (or at least function by getter/setter)
public:
std::atomic<bool> in_ready{false};
int bkt_idx;
Point p1; // TODO Result will be stored here
int return_idx;
int pidx=0; // TODO remove after debugging done
Point p1;
Point p2; // TODO allow different types of points to be added
Point result;
Point result; // TODO Remove and result will be stored in p1
std::atomic<bool> out_done{true};
ThreadTask();
ThreadTask(const ThreadTask<Point>& other);
void run();
void new_task(int in_idx, const Point& in_p1, const Point& in_p2);
void run(int tid, std::vector<int>& idle_idxs, bool& kill_thread);
void new_task(const int in_idx, const Point& in_p1, const Point& in_p2);
void chain_task(const Point in_p2);
// TODO implement bellow to make class members private
inline bool set_ready();
inline bool is_done();
inline Point get_result();
inline void set_idle();
};
template<typename Point>
struct WorkThread {
struct WorkThread { // TODO turn to class and check which members can be private (or at least function by getter/setter)
int tid;
std::vector<int> idle_idxs;
std::thread thread;
int task_round_robin=0;
std::vector<ThreadTask<Point>> tasks;
std::condition_variable idle;
std::mutex idle_mtx;
bool kill_thread=false; // TODO rethink kill thread as phase already allows breaking
~WorkThread();
void thread_setup(const int tid, const int task_per_thread);
void add_ec_tasks(bool& kill_thread);
};
template <typename Point>
class Msm
{
private:
// std::vector<WorkThread<Point>> threads;
WorkThread<Point>* threads;
const unsigned int n_threads;
const unsigned int tasks_per_thread;
bool kill_threads;
const unsigned int tasks_per_thread; // TODO check effect of the num of tasks one p1 performance and maybe have separate tasks_per_thread values for p1 and p2
int thread_round_robin;
bool any_thread_free;
bool kill_threads;
const unsigned int c;
const unsigned int num_bkts;
const unsigned int num_bms;
@@ -252,10 +273,18 @@ private:
const bool are_scalars_mont;
const bool are_points_mont;
int loop_count = 0;
int num_additions = 0;
// Phase 1
Point* bkts;
bool* bkts_occupancy;
// Phase 2
const int log_num_segments;
const int num_bm_segments;
const int segment_size;
Point* phase2_sums;
std::tuple<int, int>* task_assigned_to_sum;
Point* bm_sums;
// Phase 3
bool mid_phase3;
@@ -267,10 +296,23 @@ private:
void wait_for_idle();
// template <typename Base>
// void push_addition( const unsigned int task_bkt_idx,
// const Point bkt,
// const Base& base,
// int pidx,
// Point* result_arr,
// bool* );
template <typename Base>
void push_addition( const unsigned int task_bkt_idx,
const Point bkt,
const Base& base);
void phase1_push_addition( const unsigned int task_bkt_idx,
const Point bkt,
const Base& base,
int pidx );
std::tuple<int, int> phase2_push_addition( const unsigned int task_bkt_idx,
const Point& bkt,
const Point& base );
void bkt_file(); // TODO remove
@@ -281,11 +323,9 @@ public:
Point* bucket_accumulator( const sca_test* scalars,
const aff_test* bases,
const unsigned int msm_size);
Point* bm_sum( Point* bkts,
const unsigned int c,
const unsigned int num_bms);
const unsigned int msm_size); // TODO change type in the end to void
Point* bm_sum();
};
#endif

View File

@@ -2,4 +2,5 @@ pushd /Users/koren/Documents/REPOS/icicle/icicle_v3
rm -rf build/*
cmake -DCURVE=bn254 -DBUILD_CUDA_BE=OFF -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTS=ON -S . -B build
cmake --build build -j
mkdir ./build/generated_data
popd

View File

@@ -1,7 +1,7 @@
#include <gtest/gtest.h>
#include <iostream>
#include <list>
#include <fstream>
#include "dlfcn.h"
#include "icicle/runtime.h"
@@ -57,17 +57,68 @@ public:
}
};
template <typename T>
bool read_inputs(T* arr, const int arr_size, const std::string fname)
{
std::ifstream in_file(fname);
bool status = in_file.is_open();
if (status)
{
for (int i = 0; i < arr_size; i++)
{
in_file.read(reinterpret_cast<char*>(&arr[i]), sizeof(T));
}
in_file.close();
}
return status;
}
template <typename T>
void store_inputs(T* arr, const int arr_size, const std::string fname)
{
std::ofstream out_file(fname);
if (!out_file.is_open())
{
std::cerr << "Failed to open " << fname << " for writing.\n";
return;
}
for (int i = 0; i < arr_size; i++)
{
out_file.write(reinterpret_cast<char*>(&arr[i]), sizeof(T));
}
out_file.close();
}
void get_inputs(affine_t* bases, scalar_t* scalars, const int n) // TODO add precompute factor
{
// Scalars
std::string scalar_file = "build/generated_data/scalars_N" + std::to_string(n) + ".dat";
if (!read_inputs<scalar_t>(scalars, n, scalar_file))
{
std::cout << "Generating scalars.\n";
scalar_t::rand_host_many(scalars, n);
store_inputs<scalar_t>(scalars, n, scalar_file);
}
// Bases
std::string base_file = "build/generated_data/bases_N" + std::to_string(n) + ".dat";
if (!read_inputs<affine_t>(bases, n, base_file))
{
std::cout << "Generating bases.\n";
projective_t::rand_host_many_affine(bases, n);
store_inputs<affine_t>(bases, n, base_file);
}
}
TEST_F(CurveApiTest, MSM)
{
const int logn = 10;
const int logn = 12;
const int N = 1 << logn;
auto scalars = std::make_unique<scalar_t[]>(N);
auto bases = std::make_unique<affine_t[]>(N);
bool conv_mont = false;
get_inputs(bases.get(), scalars.get(), N);
scalar_t::rand_host_many(scalars.get(), N);
projective_t::rand_host_many_affine(bases.get(), N);
if (conv_mont) {for (int i=0; i<N; i++) bases[i] = affine_t::to_montgomery(bases[i]); }
projective_t result_cpu{};
projective_t result_cpu_dbl_n_add{};
@@ -82,8 +133,14 @@ TEST_F(CurveApiTest, MSM)
const int log_p = 2;
const int c = std::max(logn, 8) - 1;
const int pcf = 1 << log_p;
const int n_threads = 8;
const int tasks_per_thread = 2;
auto config = default_msm_config();
config.ext.set("c", c);
config.ext.set("n_threads", n_threads);
config.ext.set("tasks_per_thread", tasks_per_thread);
config.precompute_factor = pcf;
config.are_scalars_montgomery_form = false;
config.are_points_montgomery_form = conv_mont;
@@ -93,20 +150,49 @@ TEST_F(CurveApiTest, MSM)
pc_config.ext.set("is_mont", config.are_points_montgomery_form);
auto precomp_bases = std::make_unique<affine_t[]>(N*pcf);
msm_precompute_bases(bases.get(), N, pcf, pc_config, precomp_bases.get());
// TODO update cmake to include directory?
std::string precomp_fname = "build/generated_data/precomp_N" + std::to_string(N) + "_pcf" + std::to_string(pcf) + ".dat";
if (!read_inputs<affine_t>(precomp_bases.get(), N*pcf, precomp_fname))
{
std::cout << "Precomputing bases." << '\n';
msm_precompute_bases(bases.get(), N, pcf, pc_config, precomp_bases.get());
store_inputs<affine_t>(precomp_bases.get(), N*pcf, precomp_fname);
}
// int test_size = 10000;
// std::cout << "NUm additions:\t" << test_size << '\n';
// scalar_t* a = new scalar_t[test_size];
// scalar_t* b = new scalar_t[test_size];
// scalar_t* apb = new scalar_t[test_size];
// {
// scalar_t* bases_p = scalars.get();
// for (int i = 0; i < test_size; i++)
// {
// a[i] = bases_p[i];
// b[i] = bases_p[i + test_size];
// apb[i] = scalar_t::zero();
// }
// }
START_TIMER(MSM_sync)
for (int i = 0; i < iters; ++i) {
// TODO real test
// msm_precompute_bases(bases.get(), N, 1, default_msm_pre_compute_config(), bases.get());
msm(scalars.get(), precomp_bases.get(), N, config, result);
}
// for (int i = 0; i < test_size; i++)
// {
// apb[i] = a[i] * b[i];
// }
END_TIMER(MSM_sync, msg, measure);
};
// run("CPU", &result_cpu_dbl_n_add, "CPU msm", false /*=measure*/, 1 /*=iters*/); // warmup
run("CPU", &result_cpu, "CPU msm", VERBOSE /*=measure*/, 1 /*=iters*/);
run("CPU_REF", &result_cpu_ref, "CPU_REF msm", VERBOSE /*=measure*/, 1 /*=iters*/);
ASSERT_EQ((result_cpu),(result_cpu_ref));
int iters = 1;
run("CPU", &result_cpu, "CPU msm", VERBOSE /*=measure*/, iters /*=iters*/);
run("CPU_REF", &result_cpu_ref, "CPU_REF msm", VERBOSE /*=measure*/, iters /*=iters*/);
std::cout << projective_t::to_affine(result_cpu) << std::endl;
std::cout << projective_t::to_affine(result_cpu_ref) << std::endl;
ASSERT_EQ(result_cpu,result_cpu_ref);
}
int main(int argc, char** argv)