Files
AlexandreBelling 35700222f8 Feature/optimization distributedlab rebased (#356)
* fft optimization: used parallel approach from gnark-crypto

* following OmegaTymbJIep: Use atomic uint instead of the chan of uints in parallel task tracking

* Adding bitreverse from gnark-crypto

* Use atomic counter instead of uint chan to avoid locks

* Add test for AtomicCounter

* fixed build issues due to changes in keccak

* increase butterflyThreshold (because we now use kerDITNP_256 instead of kerDITNP_8)

* rollback butterflyThreshold

* adding check for nBTasks in parallel.execute

* using GOMAXPROCS instead of NumCPU

* adding execution pool

* fixed initializing thread pool

* adding ExecutePoolChunkyWithCache

* returning back from ExecutePoolChunkyWithCache to ExecutePoolChunky, adding ExecutePoolChunky to multi_to_single_point.go

* adding ExecutePoolChunkyWithAllocated

* replacing ExecutePoolChunky with ExecuteChunky in InitOpeningWithLC

* returning back  ExecutePoolChunky in InitOpeningWithLC, adding additional ExecutePoolChunky into quotient

* replacing ExecutePoolChunky with Execute into quotient

* fix: removing redundant mainWg in multi_to_single_point.go

* fix multi_to_single_point.go councurency waitning, retry quotient with ExecutePoolChunky

* ExecutePoolChunky in opening_prover.go, rolling back quotient.go

* ExecutePoolChunky in multi_to_single_point.go

* removing mutexes from quotient

* Avoid redundant bigint allocations

* fixed pool - removing redundant

* fixed - removing redundant comments

* removing keccak optimization

---------

Co-authored-by: olegfomenko <oleg.fomenko2002@gmail.com>
Co-authored-by: Anton Levochko <hykakdela@gmail.com>
2024-11-28 23:17:41 +01:00

50 lines
1.0 KiB
Go

package parallel
import (
"runtime"
"sync"
"github.com/sirupsen/logrus"
)
// Execute in parallel. This implementation is optimized for the case where
// the workload is a sequence of unbalanced and long computation time jobs.
func ExecuteChunky(nbIterations int, work func(start, stop int), numcpus ...int) {
numcpu := runtime.GOMAXPROCS(0)
if len(numcpus) > 0 && numcpus[0] > 0 {
numcpu = numcpus[0]
}
// Then just call the trivial function
if nbIterations < numcpu {
logrus.Debugf("Loss in parallelization time numpcpu = %v and nbIterator = %v", numcpu, nbIterations)
Execute(nbIterations, work, numcpu)
return
}
// The wait group ensures that all the children goroutine have terminated
// before we close the
wg := sync.WaitGroup{}
wg.Add(nbIterations)
taskCounter := NewAtomicCounter(nbIterations)
// Each goroutine consumes the jobChan to
for p := 0; p < numcpu; p++ {
go func() {
for {
taskID, ok := taskCounter.Next()
if !ok {
break
}
work(taskID, taskID+1)
wg.Done()
}
}()
}
wg.Wait()
}