mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-07 06:34:01 -05:00
* Ran gopls modernize to fix everything go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./... * Override rules_go provided dependency for golang.org/x/tools to v0.38.0. To update this, checked out rules_go, then ran `bazel run //go/tools/releaser -- upgrade-dep -mirror=false org_golang_x_tools` and copied the patches. * Fix buildtag violations and ignore buildtag violations in external * Introduce modernize analyzer package. * Add modernize "any" analyzer. * Fix violations of any analyzer * Add modernize "appendclipped" analyzer. * Fix violations of appendclipped * Add modernize "bloop" analyzer. * Add modernize "fmtappendf" analyzer. * Add modernize "forvar" analyzer. * Add modernize "mapsloop" analyzer. * Add modernize "minmax" analyzer. * Fix violations of minmax analyzer * Add modernize "omitzero" analyzer. * Add modernize "rangeint" analyzer. * Fix violations of rangeint. * Add modernize "reflecttypefor" analyzer. * Fix violations of reflecttypefor analyzer. * Add modernize "slicescontains" analyzer. * Add modernize "slicessort" analyzer. * Add modernize "slicesdelete" analyzer. This is disabled by default for now. See https://go.dev/issue/73686. * Add modernize "stringscutprefix" analyzer. * Add modernize "stringsbuilder" analyzer. * Fix violations of stringsbuilder analyzer. * Add modernize "stringsseq" analyzer. * Add modernize "testingcontext" analyzer. * Add modernize "waitgroup" analyzer. * Changelog fragment * gofmt * gazelle * Add modernize "newexpr" analyzer. * Disable newexpr until go1.26 * Add more details in WORKSPACE on how to update the override * @nalepae feedback on min() * gofmt * Fix violations of forvar
78 lines
2.0 KiB
Go
78 lines
2.0 KiB
Go
package async
|
|
|
|
import (
|
|
"errors"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
// WorkerResults are the results of a scatter worker.
|
|
type WorkerResults struct {
|
|
Offset int
|
|
Extent any
|
|
}
|
|
|
|
// Scatter scatters a computation across multiple goroutines.
|
|
// This breaks the task in to a number of chunks and executes those chunks in parallel with the function provided.
|
|
// Results returned are collected and presented as a set of WorkerResults, which can be reassembled by the calling function.
|
|
// Any error that occurs in the workers will be passed back to the calling function.
|
|
func Scatter(inputLen int, sFunc func(int, int, *sync.RWMutex) (any, error)) ([]*WorkerResults, error) {
|
|
if inputLen <= 0 {
|
|
return nil, errors.New("input length must be greater than 0")
|
|
}
|
|
|
|
chunkSize := calculateChunkSize(inputLen)
|
|
workers := inputLen / chunkSize
|
|
if inputLen%chunkSize != 0 {
|
|
workers++
|
|
}
|
|
resultCh := make(chan *WorkerResults, workers)
|
|
defer close(resultCh)
|
|
errorCh := make(chan error, workers)
|
|
defer close(errorCh)
|
|
mutex := new(sync.RWMutex)
|
|
for worker := 0; worker < workers; worker++ {
|
|
offset := worker * chunkSize
|
|
entries := chunkSize
|
|
if offset+entries > inputLen {
|
|
entries = inputLen - offset
|
|
}
|
|
go func(offset int, entries int) {
|
|
extent, err := sFunc(offset, entries, mutex)
|
|
if err != nil {
|
|
errorCh <- err
|
|
} else {
|
|
resultCh <- &WorkerResults{
|
|
Offset: offset,
|
|
Extent: extent,
|
|
}
|
|
}
|
|
}(offset, entries)
|
|
}
|
|
|
|
// Collect results from workers
|
|
results := make([]*WorkerResults, workers)
|
|
for i := 0; i < workers; i++ {
|
|
select {
|
|
case result := <-resultCh:
|
|
results[i] = result
|
|
case err := <-errorCh:
|
|
return nil, err
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// calculateChunkSize calculates a suitable chunk size for the purposes of parallelization.
|
|
func calculateChunkSize(items int) int {
|
|
// Start with a simple even split
|
|
chunkSize := items / runtime.GOMAXPROCS(0)
|
|
|
|
// Add 1 if we have leftovers (or if we have fewer items than processors).
|
|
if chunkSize == 0 || items%chunkSize != 0 {
|
|
chunkSize++
|
|
}
|
|
|
|
return chunkSize
|
|
}
|