Add experimental --Xsnapsync-bft-enabled which enables snap sync for BFT chains (#7140)

* Create a BFT-specific pivot block handler

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Change visibility

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Refactor sync-peer-count internal variable to match name, add experimental flag to enabled snap + BFT

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Merge with main

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Fix uppercase

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Address synchronization issue with trie pruning. Create BFT-specific account range handler. Add pipeline name and logs

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Remove debug log

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* fixing snapsync for empty state

Signed-off-by: Karim Taam <karim.t2am@gmail.com>

* Don't queue up events we can't handle

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Fix timing window where a validator with an empty data dir sometimes falls back to full sync if peer status isn't received quickly enough

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Remove BFT-specific account request class. Not needed

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Refactor some more 'fast' sync variables that are common to all pivot-based sync types

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* In FULL sync mode, disable bonsai-limit-trie-logs-enabled instead of failing to start

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Add javadoc comments, clarify overriding bonsai-limit-trie-logs-enabled

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Add BFT pivot block selector tests

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Fix failure error message

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Remove the unnamed Pipe constructor and update tests to set a pipe name

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Revert some info logs back to debug given the feedback on noise in the logs syncing with holesky

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Refactor fastSyncPivotDistance to syncPivotDistance

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Incomplete refactoring

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Update BFT event queueing tests

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Event queue test fixes

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

* Remove automatic setting of bonsai-limit-trie-logs-enabled to false if sync-mode = FULL (moving to another PR)

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>

---------

Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>
Signed-off-by: Karim Taam <karim.t2am@gmail.com>
Signed-off-by: Matt Whitehead <matthew.whitehead@kaleido.io>
Co-authored-by: Karim Taam <karim.t2am@gmail.com>
This commit is contained in:
Matt Whitehead
2024-07-02 09:39:59 +01:00
committed by GitHub
parent 3a73dccd61
commit 8ca7129b0b
44 changed files with 708 additions and 127 deletions

View File

@@ -83,7 +83,7 @@ class AsyncOperationProcessor<I, O> implements Processor<I, O> {
waitForAnyFutureToComplete();
outputCompletedTasks(outputPipe);
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for processing to complete", e);
LOG.trace("Interrupted while waiting for processing to complete", e.getMessage());
} catch (final ExecutionException e) {
throw new AsyncOperationException("Async operation failed. " + e.getMessage(), e);
} catch (final TimeoutException e) {

View File

@@ -44,6 +44,7 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
private final Counter abortedItemCounter;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private String pipeName = "";
/**
* Instantiates a new Pipe.
@@ -52,16 +53,28 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
* @param inputCounter the input counter
* @param outputCounter the output counter
* @param abortedItemCounter the aborted item counter
* @param pipeName the name of the pipe
*/
public Pipe(
final int capacity,
final Counter inputCounter,
final Counter outputCounter,
final Counter abortedItemCounter) {
final Counter abortedItemCounter,
final String pipeName) {
queue = new ArrayBlockingQueue<>(capacity);
this.inputCounter = inputCounter;
this.outputCounter = outputCounter;
this.abortedItemCounter = abortedItemCounter;
this.pipeName = pipeName;
}
/**
* Get the name of this pipe
*
* @return the name of the pipe
*/
public String getPipeName() {
return pipeName;
}
@Override
@@ -110,7 +123,7 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
}
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for next item", e);
LOG.trace("Interrupted while waiting for next item from pipe {}", pipeName);
}
return null;
}
@@ -140,7 +153,7 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
return;
}
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting to add to output", e);
LOG.trace("Interrupted while waiting to add to output to pipe {}", pipeName);
}
}
}

View File

@@ -89,6 +89,19 @@ public class Pipeline<I> {
this.stages = stages;
this.pipes = pipes;
this.completerStage = completerStage;
if (LOG.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Building pipeline ");
sb.append(name);
sb.append(". Stages: ");
for (Stage nextStage : stages) {
sb.append(nextStage.getName());
sb.append(" -> ");
}
sb.append("END");
LOG.trace("{}", sb.toString());
}
}
/**

View File

@@ -421,6 +421,7 @@ public class PipelineBuilder<I, T> {
newBufferSize,
outputCounter.labels(labelName, "added"),
outputCounter.labels(labelName, "removed"),
outputCounter.labels(labelName, "aborted"));
outputCounter.labels(labelName, "aborted"),
stageName);
}
}

View File

@@ -31,7 +31,8 @@ import org.junit.jupiter.api.Test;
public class BatchingReadPipeTest {
private final Pipe<String> source = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> source =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "source_pipe");
private final Counter batchCounter = mock(Counter.class);
private final BatchingReadPipe<String> batchingPipe =
new BatchingReadPipe<>(source, 3, batchCounter);

View File

@@ -24,7 +24,8 @@ import org.junit.jupiter.api.Test;
public class CompleterStageTest {
private final Pipe<String> pipe = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> pipe =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "test_pipe");
private final List<String> output = new ArrayList<>();
private final CompleterStage<String> stage = new CompleterStage<>("name", pipe, output::add);

View File

@@ -28,8 +28,10 @@ import org.junit.jupiter.api.Test;
public class FlatMapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> input =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "input_pipe");
private final Pipe<String> output =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "output_pipe");
@SuppressWarnings("unchecked")
private final Function<String, Stream<String>> mapper = mock(Function.class);

View File

@@ -22,7 +22,8 @@ import org.junit.jupiter.api.Test;
public class IteratorSourceStageTest {
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "output_pipe");
private final IteratorSourceStage<String> stage =
new IteratorSourceStage<>("name", Iterators.forArray("a", "b", "c", "d"), output);

View File

@@ -27,8 +27,10 @@ import org.junit.jupiter.api.Test;
public class MapProcessorTest {
private final Pipe<String> input = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> output = new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
private final Pipe<String> input =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "intput_pipe");
private final Pipe<String> output =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "output_pipe");
@SuppressWarnings("unchecked")
private final Function<String, String> processor = mock(Function.class);

View File

@@ -30,7 +30,8 @@ public class PipeTest {
private final Counter inputCounter = mock(Counter.class);
private final Counter outputCounter = mock(Counter.class);
private final Counter abortedItemCounter = mock(Counter.class);
private final Pipe<String> pipe = new Pipe<>(5, inputCounter, outputCounter, abortedItemCounter);
private final Pipe<String> pipe =
new Pipe<>(5, inputCounter, outputCounter, abortedItemCounter, "test_pipe");
@Test
public void shouldNotHaveMoreWhenEmptyAndClosed() {

View File

@@ -34,9 +34,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class ProcessingStageTest {
private final Pipe<String> inputPipe =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "input_pipe");
private final Pipe<String> outputPipe =
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER);
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "output_pipe");
@Mock private Processor<String, String> singleStep;
private ProcessingStage<String, String> stage;