fix: replace parallel processing error with fallback to sequential processing

Replace the "should not be reached" error in addBatchInDisk with graceful
fallback to sequential processing when tree structure doesn't support
parallel processing with nCPU goroutines.

This can happen when the tree is highly unbalanced or when key distribution
doesn't create the expected binary tree structure. The fix ensures correctness
by falling back to the reliable sequential processing path.

Signed-off-by: p4u <pau@dabax.net>
This commit is contained in:
p4u
2025-06-27 13:47:04 +02:00
parent 64090db004
commit 703dba35f5
2 changed files with 33 additions and 22 deletions

27
tree.go
View File

@@ -264,16 +264,20 @@ func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]Invalid,
return t.addBatchInMemory(wTx, keys, values)
}
func (t *Tree) addBatchSequential(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) {
var invalids []Invalid
for i := range keys {
if err := t.addWithTx(wTx, keys[i], values[i]); err != nil {
invalids = append(invalids, Invalid{i, err})
}
}
return invalids, nil
}
func (t *Tree) addBatchInDisk(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) {
nCPU := flp2(runtime.NumCPU())
if nCPU == 1 || len(keys) < nCPU {
var invalids []Invalid
for i := range keys {
if err := t.addWithTx(wTx, keys[i], values[i]); err != nil {
invalids = append(invalids, Invalid{i, err})
}
}
return invalids, nil
return t.addBatchSequential(wTx, keys, values)
}
// split keys and values in buckets to add them in parallel by CPU
kvs, invalids, err := keysValuesToKvs(t.maxLevels, keys, values)
@@ -321,10 +325,11 @@ func (t *Tree) addBatchInDisk(wTx db.WriteTx, keys, values [][]byte) ([]Invalid,
}
if len(subRoots) != nCPU {
return nil, fmt.Errorf("this error should not be reached."+
" len(subRoots) != nCPU, len(subRoots)=%d, nCPU=%d."+
" Please report it in a new issue:"+
" https://github.com/vocdoni/arbo/issues/new", len(subRoots), nCPU)
// Tree structure doesn't support parallel processing with nCPU goroutines.
// This can happen when the tree is highly unbalanced or when key distribution
// doesn't create the expected binary tree structure. Fall back to sequential
// processing to ensure correctness.
return t.addBatchSequential(wTx, keys, values)
}
invalidsInBucket := make([][]Invalid, nCPU)

28
vt.go
View File

@@ -86,6 +86,17 @@ func newVT(maxLevels int, hash HashFunction) vt {
}
}
// addBatchSequential adds a batch of key-values to the VirtualTree sequentially.
func (t *vt) addBatchSequential(ks, vs [][]byte) ([]Invalid, error) {
var invalids []Invalid
for i := 0; i < len(ks); i++ {
if err := t.add(0, ks[i], vs[i]); err != nil {
invalids = append(invalids, Invalid{i, err})
}
}
return invalids, nil
}
// addBatch adds a batch of key-values to the VirtualTree. Returns an array
// containing the indexes of the keys failed to add. Does not include the
// computation of hashes of the nodes neither the storage of the key-values of
@@ -94,13 +105,7 @@ func newVT(maxLevels int, hash HashFunction) vt {
func (t *vt) addBatch(ks, vs [][]byte) ([]Invalid, error) {
nCPU := flp2(runtime.NumCPU())
if nCPU == 1 || len(ks) < nCPU {
var invalids []Invalid
for i := 0; i < len(ks); i++ {
if err := t.add(0, ks[i], vs[i]); err != nil {
invalids = append(invalids, Invalid{i, err})
}
}
return invalids, nil
return t.addBatchSequential(ks, vs)
}
l := int(math.Log2(float64(nCPU)))
@@ -171,10 +176,11 @@ func (t *vt) addBatch(ks, vs [][]byte) ([]Invalid, error) {
}
}
if len(nodesAtL) != nCPU {
return nil, fmt.Errorf("this error should not be reached."+
" len(nodesAtL) != nCPU, len(nodesAtL)=%d, nCPU=%d."+
" Please report it in a new issue:"+
" https://github.com/vocdoni/arbo/issues/new", len(nodesAtL), nCPU)
// Tree structure doesn't support parallel processing with nCPU goroutines.
// This can happen when the tree is highly unbalanced or when key distribution
// doesn't create the expected binary tree structure. Fall back to sequential
// processing to ensure correctness.
return t.addBatchSequential(ks, vs)
}
subRoots := make([]*node, nCPU)