Files
prysm/async/scatter.go
Preston Van Loon 2fd6bd8150 Add golang.org/x/tools modernize static analyzer and fix violations (#15946)
* Ran gopls modernize to fix everything

go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./...

* Override rules_go provided dependency for golang.org/x/tools to v0.38.0.

To update this, checked out rules_go, then ran `bazel run //go/tools/releaser -- upgrade-dep -mirror=false org_golang_x_tools` and copied the patches.

* Fix buildtag violations and ignore buildtag violations in external

* Introduce modernize analyzer package.

* Add modernize "any" analyzer.

* Fix violations of any analyzer

* Add modernize "appendclipped" analyzer.

* Fix violations of appendclipped

* Add modernize "bloop" analyzer.

* Add modernize "fmtappendf" analyzer.

* Add modernize "forvar" analyzer.

* Add modernize "mapsloop" analyzer.

* Add modernize "minmax" analyzer.

* Fix violations of minmax analyzer

* Add modernize "omitzero" analyzer.

* Add modernize "rangeint" analyzer.

* Fix violations of rangeint.

* Add modernize "reflecttypefor" analyzer.

* Fix violations of reflecttypefor analyzer.

* Add modernize "slicescontains" analyzer.

* Add modernize "slicessort" analyzer.

* Add modernize "slicesdelete" analyzer. This is disabled by default for now. See https://go.dev/issue/73686.

* Add modernize "stringscutprefix" analyzer.

* Add modernize "stringsbuilder" analyzer.

* Fix violations of stringsbuilder analyzer.

* Add modernize "stringsseq" analyzer.

* Add modernize "testingcontext" analyzer.

* Add modernize "waitgroup" analyzer.

* Changelog fragment

* gofmt

* gazelle

* Add modernize "newexpr" analyzer.

* Disable newexpr until go1.26

* Add more details in WORKSPACE on how to update the override

* @nalepae feedback on min()

* gofmt

* Fix violations of forvar
2025-11-14 01:27:22 +00:00

78 lines
2.0 KiB
Go

package async
import (
"errors"
"runtime"
"sync"
)
// WorkerResults are the results of a scatter worker.
type WorkerResults struct {
Offset int
Extent any
}
// Scatter scatters a computation across multiple goroutines.
// This breaks the task in to a number of chunks and executes those chunks in parallel with the function provided.
// Results returned are collected and presented as a set of WorkerResults, which can be reassembled by the calling function.
// Any error that occurs in the workers will be passed back to the calling function.
func Scatter(inputLen int, sFunc func(int, int, *sync.RWMutex) (any, error)) ([]*WorkerResults, error) {
if inputLen <= 0 {
return nil, errors.New("input length must be greater than 0")
}
chunkSize := calculateChunkSize(inputLen)
workers := inputLen / chunkSize
if inputLen%chunkSize != 0 {
workers++
}
resultCh := make(chan *WorkerResults, workers)
defer close(resultCh)
errorCh := make(chan error, workers)
defer close(errorCh)
mutex := new(sync.RWMutex)
for worker := 0; worker < workers; worker++ {
offset := worker * chunkSize
entries := chunkSize
if offset+entries > inputLen {
entries = inputLen - offset
}
go func(offset int, entries int) {
extent, err := sFunc(offset, entries, mutex)
if err != nil {
errorCh <- err
} else {
resultCh <- &WorkerResults{
Offset: offset,
Extent: extent,
}
}
}(offset, entries)
}
// Collect results from workers
results := make([]*WorkerResults, workers)
for i := 0; i < workers; i++ {
select {
case result := <-resultCh:
results[i] = result
case err := <-errorCh:
return nil, err
}
}
return results, nil
}
// calculateChunkSize calculates a suitable chunk size for the purposes of parallelization.
func calculateChunkSize(items int) int {
// Start with a simple even split
chunkSize := items / runtime.GOMAXPROCS(0)
// Add 1 if we have leftovers (or if we have fewer items than processors).
if chunkSize == 0 || items%chunkSize != 0 {
chunkSize++
}
return chunkSize
}