diff --git a/tree.go b/tree.go index 1de6d58..afa16d0 100644 --- a/tree.go +++ b/tree.go @@ -264,16 +264,20 @@ func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, return t.addBatchInMemory(wTx, keys, values) } +func (t *Tree) addBatchSequential(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) { + var invalids []Invalid + for i := range keys { + if err := t.addWithTx(wTx, keys[i], values[i]); err != nil { + invalids = append(invalids, Invalid{i, err}) + } + } + return invalids, nil +} + func (t *Tree) addBatchInDisk(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) { nCPU := flp2(runtime.NumCPU()) if nCPU == 1 || len(keys) < nCPU { - var invalids []Invalid - for i := range keys { - if err := t.addWithTx(wTx, keys[i], values[i]); err != nil { - invalids = append(invalids, Invalid{i, err}) - } - } - return invalids, nil + return t.addBatchSequential(wTx, keys, values) } // split keys and values in buckets to add them in parallel by CPU kvs, invalids, err := keysValuesToKvs(t.maxLevels, keys, values) @@ -321,10 +325,11 @@ func (t *Tree) addBatchInDisk(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, } if len(subRoots) != nCPU { - return nil, fmt.Errorf("this error should not be reached."+ - " len(subRoots) != nCPU, len(subRoots)=%d, nCPU=%d."+ - " Please report it in a new issue:"+ - " https://github.com/vocdoni/arbo/issues/new", len(subRoots), nCPU) + // Tree structure doesn't support parallel processing with nCPU goroutines. + // This can happen when the tree is highly unbalanced or when key distribution + // doesn't create the expected binary tree structure. Fall back to sequential + // processing to ensure correctness. + return t.addBatchSequential(wTx, keys, values) } invalidsInBucket := make([][]Invalid, nCPU) diff --git a/vt.go b/vt.go index 29582a8..e3e6732 100644 --- a/vt.go +++ b/vt.go @@ -86,6 +86,17 @@ func newVT(maxLevels int, hash HashFunction) vt { } } +// addBatchSequential adds a batch of key-values to the VirtualTree sequentially. +func (t *vt) addBatchSequential(ks, vs [][]byte) ([]Invalid, error) { + var invalids []Invalid + for i := 0; i < len(ks); i++ { + if err := t.add(0, ks[i], vs[i]); err != nil { + invalids = append(invalids, Invalid{i, err}) + } + } + return invalids, nil +} + // addBatch adds a batch of key-values to the VirtualTree. Returns an array // containing the indexes of the keys failed to add. Does not include the // computation of hashes of the nodes neither the storage of the key-values of @@ -94,13 +105,7 @@ func newVT(maxLevels int, hash HashFunction) vt { func (t *vt) addBatch(ks, vs [][]byte) ([]Invalid, error) { nCPU := flp2(runtime.NumCPU()) if nCPU == 1 || len(ks) < nCPU { - var invalids []Invalid - for i := 0; i < len(ks); i++ { - if err := t.add(0, ks[i], vs[i]); err != nil { - invalids = append(invalids, Invalid{i, err}) - } - } - return invalids, nil + return t.addBatchSequential(ks, vs) } l := int(math.Log2(float64(nCPU))) @@ -171,10 +176,11 @@ func (t *vt) addBatch(ks, vs [][]byte) ([]Invalid, error) { } } if len(nodesAtL) != nCPU { - return nil, fmt.Errorf("this error should not be reached."+ - " len(nodesAtL) != nCPU, len(nodesAtL)=%d, nCPU=%d."+ - " Please report it in a new issue:"+ - " https://github.com/vocdoni/arbo/issues/new", len(nodesAtL), nCPU) + // Tree structure doesn't support parallel processing with nCPU goroutines. + // This can happen when the tree is highly unbalanced or when key distribution + // doesn't create the expected binary tree structure. Fall back to sequential + // processing to ensure correctness. + return t.addBatchSequential(ks, vs) } subRoots := make([]*node, nCPU)