Add snapsync pipeline (#3656)

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
This commit is contained in:
matkt
2022-04-04 15:22:48 +02:00
committed by GitHub
parent e336f87dc8
commit e9b9de6844
43 changed files with 2177 additions and 79 deletions

View File

@@ -19,18 +19,29 @@ import org.hyperledger.besu.plugin.services.metrics.Counter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
private final ReadPipe<T> input;
private final int maximumBatchSize;
private final Counter batchCounter;
private final Function<List<T>, Integer> stopBatchCondition;
public BatchingReadPipe(
final ReadPipe<T> input, final int maximumBatchSize, final Counter batchCounter) {
this(input, maximumBatchSize, batchCounter, ts -> maximumBatchSize - ts.size());
}
public BatchingReadPipe(
final ReadPipe<T> input,
final int maximumBatchSize,
final Counter batchCounter,
final Function<List<T>, Integer> batchEndCondition) {
this.input = input;
this.maximumBatchSize = maximumBatchSize;
this.batchCounter = batchCounter;
this.stopBatchCondition = batchEndCondition;
}
@Override
@@ -53,7 +64,15 @@ public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
}
final List<T> batch = new ArrayList<>();
batch.add(firstItem);
input.drainTo(batch, maximumBatchSize - 1);
Integer remainingData = stopBatchCondition.apply(batch);
while (remainingData > 0
&& (batch.size() + remainingData) <= maximumBatchSize
&& input.hasMore()) {
if (input.drainTo(batch, remainingData) == 0) {
break;
}
remainingData = stopBatchCondition.apply(batch);
}
batchCounter.inc();
return batch;
}
@@ -61,7 +80,15 @@ public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
@Override
public List<T> poll() {
final List<T> batch = new ArrayList<>();
input.drainTo(batch, maximumBatchSize);
Integer remainingData = stopBatchCondition.apply(batch);
while (remainingData > 0
&& (batch.size() + remainingData) <= maximumBatchSize
&& input.hasMore()) {
if (input.drainTo(batch, remainingData) == 0) {
break;
}
remainingData = stopBatchCondition.apply(batch);
}
if (batch.isEmpty()) {
// Poll has to return null if the pipe is empty
return null;
@@ -71,10 +98,12 @@ public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
}
@Override
public void drainTo(final Collection<List<T>> output, final int maxElements) {
public int drainTo(final Collection<List<T>> output, final int maxElements) {
final List<T> nextBatch = poll();
if (nextBatch != null) {
output.add(nextBatch);
return nextBatch.size();
}
return 0;
}
}

View File

@@ -117,9 +117,10 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
}
@Override
public void drainTo(final Collection<T> output, final int maxElements) {
public int drainTo(final Collection<T> output, final int maxElements) {
final int count = queue.drainTo(output, maxElements);
outputCounter.inc(count);
return count;
}
@Override

View File

@@ -253,6 +253,36 @@ public class PipelineBuilder<I, T> {
pipelineName);
}
/**
* Batches items into groups of at most <i>maximumBatchSize</i>. Batches are created eagerly to
* minimize delay so may not be full.
*
* <p>Order of items is preserved.
*
* <p>The output buffer size is reduced to <code>bufferSize / maximumBatchSize + 1</code>.
*
* @param maximumBatchSize the maximum number of items to include in a batch.
* @param stopBatchCondition the condition before ending the batch
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
*/
public PipelineBuilder<I, List<T>> inBatches(
final int maximumBatchSize, final Function<List<T>, Integer> stopBatchCondition) {
return new PipelineBuilder<>(
inputPipe,
stages,
pipes,
lastStageName,
new BatchingReadPipe<>(
pipeEnd,
maximumBatchSize,
outputCounter.labels(lastStageName + "_outputPipe", "batches"),
stopBatchCondition),
(int) Math.ceil(((double) bufferSize) / maximumBatchSize),
outputCounter,
tracingEnabled,
pipelineName);
}
/**
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
* is called and each item of the {@link Stream} it returns is output as an individual item. The

View File

@@ -61,6 +61,7 @@ public interface ReadPipe<T> {
*
* @param output the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
* @return the number of elements drained in the pipe
*/
void drainTo(Collection<T> output, int maxElements);
int drainTo(Collection<T> output, int maxElements);
}