Files
linea-monorepo/prover/utils/parallel/thread_aware.go
AlexandreBelling 35700222f8 Feature/optimization distributedlab rebased (#356)
* fft optimization: used parallel approach from gnark-crypto

* following OmegaTymbJIep: Use atomic uint instead of the chan of uints in parallel task tracking

* Adding bitreverse from gnark-crypto

* Use atomic counter instead of uint chan to avoid locks

* Add test for AtomicCounter

* fixed build issues due to changes in keccak

* increase butterflyThreshold (because we now use kerDITNP_256 instead of kerDITNP_8)

* rollback butterflyThreshold

* adding check for nBTasks in parallel.execute

* using GOMAXPROCS instead of NumCPU

* adding execution pool

* fixed initializing thread pool

* adding ExecutePoolChunkyWithCache

* returning back from ExecutePoolChunkyWithCache to ExecutePoolChunky, adding ExecutePoolChunky to multi_to_single_point.go

* adding ExecutePoolChunkyWithAllocated

* replacing ExecutePoolChunky with ExecuteChunky in InitOpeningWithLC

* returning back  ExecutePoolChunky in InitOpeningWithLC, adding additional ExecutePoolChunky into quotient

* replacing ExecutePoolChunky with Execute into quotient

* fix: removing redundant mainWg in multi_to_single_point.go

* fix multi_to_single_point.go councurency waitning, retry quotient with ExecutePoolChunky

* ExecutePoolChunky in opening_prover.go, rolling back quotient.go

* ExecutePoolChunky in multi_to_single_point.go

* removing mutexes from quotient

* Avoid redundant bigint allocations

* fixed pool - removing redundant

* fixed - removing redundant comments

* removing keccak optimization

---------

Co-authored-by: olegfomenko <oleg.fomenko2002@gmail.com>
Co-authored-by: Anton Levochko <hykakdela@gmail.com>
2024-11-28 23:17:41 +01:00

44 lines
781 B
Go

package parallel
import (
"runtime"
"sync"
)
type ThreadInit func(threadID int)
type Worker func(taskID, threadID int)
func ExecuteThreadAware(nbIterations int, init ThreadInit, worker Worker, numcpus ...int) {
numcpu := runtime.GOMAXPROCS(0)
if len(numcpus) > 0 && numcpus[0] > 0 {
numcpu = numcpus[0]
}
// The wait group ensures that all the children goroutine have terminated
// before we close the
wg := sync.WaitGroup{}
wg.Add(nbIterations)
taskCounter := NewAtomicCounter(nbIterations)
// Each goroutine consumes the jobChan to
for p := 0; p < numcpu; p++ {
threadID := p
go func() {
init(threadID)
for {
taskID, ok := taskCounter.Next()
if !ok {
break
}
worker(taskID, threadID)
wg.Done()
}
}()
}
wg.Wait()
}