mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-08 23:17:54 -05:00
Fix javadocs to allow build to pass in JDK 17 (#4834)
- Added missing javadocs so that javadoc doclint passes against JDK 17 (invoke by Besu gradle build). - Exclude following packages from javadoc lint: org.hyperledger.besu.privacy.contracts.generated org.hyperledger.besu.tests.acceptance.* - Temporarily exclude ethereum and evm submodule for doc lint checks. - Run the javadoc task using GitHub actions (use Java 17) to report any javadoc errors during the PR builds - Updating plugin-api build.gradle with new hash as javadoc comments caused it to change Signed-off-by: Usman Saleem <usman@usmans.info>
This commit is contained in:
@@ -36,15 +36,22 @@ import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.tuweni.bytes.Bytes;
|
||||
|
||||
/** The In memory key value storage. */
|
||||
public class InMemoryKeyValueStorage implements KeyValueStorage {
|
||||
|
||||
private final Map<Bytes, byte[]> hashValueStore;
|
||||
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
|
||||
/** Instantiates a new In memory key value storage. */
|
||||
public InMemoryKeyValueStorage() {
|
||||
this(new HashMap<>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new In memory key value storage.
|
||||
*
|
||||
* @param hashValueStore the hash value store
|
||||
*/
|
||||
protected InMemoryKeyValueStorage(final Map<Bytes, byte[]> hashValueStore) {
|
||||
this.hashValueStore = hashValueStore;
|
||||
}
|
||||
@@ -138,6 +145,11 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
|
||||
return new KeyValueStorageTransactionTransitionValidatorDecorator(new InMemoryTransaction());
|
||||
}
|
||||
|
||||
/**
|
||||
* Key set.
|
||||
*
|
||||
* @return the set of keys
|
||||
*/
|
||||
public Set<Bytes> keySet() {
|
||||
return Set.copyOf(hashValueStore.keySet());
|
||||
}
|
||||
@@ -180,6 +192,11 @@ public class InMemoryKeyValueStorage implements KeyValueStorage {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump.
|
||||
*
|
||||
* @param ps the PrintStream where to report the dump
|
||||
*/
|
||||
public void dump(final PrintStream ps) {
|
||||
final Lock lock = rwLock.readLock();
|
||||
lock.lock();
|
||||
|
||||
@@ -30,6 +30,7 @@ import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** The In memory storage plugin. */
|
||||
public class InMemoryStoragePlugin implements BesuPlugin {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(InMemoryStoragePlugin.class);
|
||||
@@ -87,11 +88,17 @@ public class InMemoryStoragePlugin implements BesuPlugin {
|
||||
() -> LOG.error("Failed to register KeyValueFactory due to missing StorageService."));
|
||||
}
|
||||
|
||||
/** The Memory key value storage factory. */
|
||||
public static class MemoryKeyValueStorageFactory implements KeyValueStorageFactory {
|
||||
|
||||
private final String name;
|
||||
private final Map<SegmentIdentifier, InMemoryKeyValueStorage> storageMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Instantiates a new Memory key value storage factory.
|
||||
*
|
||||
* @param name the name
|
||||
*/
|
||||
public MemoryKeyValueStorageFactory(final String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@@ -19,12 +19,18 @@ import static com.google.common.base.Preconditions.checkState;
|
||||
import org.hyperledger.besu.plugin.services.exception.StorageException;
|
||||
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
|
||||
|
||||
/** The Key value storage transaction transition validator decorator. */
|
||||
public class KeyValueStorageTransactionTransitionValidatorDecorator
|
||||
implements KeyValueStorageTransaction {
|
||||
|
||||
private final KeyValueStorageTransaction transaction;
|
||||
private boolean active = true;
|
||||
|
||||
/**
|
||||
* Instantiates a new Key value storage transaction transition validator decorator.
|
||||
*
|
||||
* @param toDecorate the to decorate
|
||||
*/
|
||||
public KeyValueStorageTransactionTransitionValidatorDecorator(
|
||||
final KeyValueStorageTransaction toDecorate) {
|
||||
this.transaction = toDecorate;
|
||||
|
||||
@@ -46,6 +46,11 @@ public class LimitedInMemoryKeyValueStorage implements KeyValueStorage {
|
||||
private final Cache<Bytes, byte[]> storage;
|
||||
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
|
||||
/**
|
||||
* Instantiates a new Limited in memory key value storage.
|
||||
*
|
||||
* @param maxSize the max size
|
||||
*/
|
||||
public LimitedInMemoryKeyValueStorage(final long maxSize) {
|
||||
storage = CacheBuilder.newBuilder().maximumSize(maxSize).build();
|
||||
}
|
||||
|
||||
@@ -32,6 +32,12 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
*/
|
||||
public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
|
||||
/**
|
||||
* Gets segment identifier by name.
|
||||
*
|
||||
* @param segment the segment
|
||||
* @return the segment identifier by name
|
||||
*/
|
||||
S getSegmentIdentifierByName(SegmentIdentifier segment);
|
||||
|
||||
/**
|
||||
@@ -40,9 +46,18 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
* @param segment the segment
|
||||
* @param key Index into persistent data repository.
|
||||
* @return The value persisted at the key index.
|
||||
* @throws StorageException the storage exception
|
||||
*/
|
||||
Optional<byte[]> get(S segment, byte[] key) throws StorageException;
|
||||
|
||||
/**
|
||||
* Contains key.
|
||||
*
|
||||
* @param segment the segment
|
||||
* @param key the key
|
||||
* @return the boolean
|
||||
* @throws StorageException the storage exception
|
||||
*/
|
||||
default boolean containsKey(final S segment, final byte[] key) throws StorageException {
|
||||
return get(segment, key).isPresent();
|
||||
}
|
||||
@@ -51,6 +66,7 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
* Begins a transaction. Returns a transaction object that can be updated and committed.
|
||||
*
|
||||
* @return An object representing the transaction.
|
||||
* @throws StorageException the storage exception
|
||||
*/
|
||||
Transaction<S> startTransaction() throws StorageException;
|
||||
|
||||
@@ -62,23 +78,49 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
*/
|
||||
Stream<Pair<byte[], byte[]>> stream(final S segmentHandle);
|
||||
|
||||
/**
|
||||
* Stream keys.
|
||||
*
|
||||
* @param segmentHandle the segment handle
|
||||
* @return the stream
|
||||
*/
|
||||
Stream<byte[]> streamKeys(final S segmentHandle);
|
||||
|
||||
/**
|
||||
* Delete the value corresponding to the given key in the given segment if a write lock can be
|
||||
* instantly acquired on the underlying storage. Do nothing otherwise.
|
||||
*
|
||||
* @param segmentHandle The segment handle whose keys we want to stream.
|
||||
* @param key The key to delete.
|
||||
* @throws StorageException any problem encountered during the deletion attempt.
|
||||
* @return false if the lock on the underlying storage could not be instantly acquired, true
|
||||
* otherwise
|
||||
* @throws StorageException any problem encountered during the deletion attempt.
|
||||
*/
|
||||
boolean tryDelete(S segmentHandle, byte[] key) throws StorageException;
|
||||
|
||||
/**
|
||||
* Gets all keys that matches condition.
|
||||
*
|
||||
* @param segmentHandle the segment handle
|
||||
* @param returnCondition the return condition
|
||||
* @return set of result
|
||||
*/
|
||||
Set<byte[]> getAllKeysThat(S segmentHandle, Predicate<byte[]> returnCondition);
|
||||
|
||||
/**
|
||||
* Gets all values from keys that matches condition.
|
||||
*
|
||||
* @param segmentHandle the segment handle
|
||||
* @param returnCondition the return condition
|
||||
* @return the set of result
|
||||
*/
|
||||
Set<byte[]> getAllValuesFromKeysThat(final S segmentHandle, Predicate<byte[]> returnCondition);
|
||||
|
||||
/**
|
||||
* Clear.
|
||||
*
|
||||
* @param segmentHandle the segment handle
|
||||
*/
|
||||
void clear(S segmentHandle);
|
||||
|
||||
/**
|
||||
@@ -110,6 +152,8 @@ public interface SegmentedKeyValueStorage<S> extends Closeable {
|
||||
* Atomically commit the set of changes contained in this transaction to the underlying
|
||||
* key-value storage from which this transaction was started. After committing, the transaction
|
||||
* is no longer usable and will throw exceptions if modifications are attempted.
|
||||
*
|
||||
* @throws StorageException the storage exception
|
||||
*/
|
||||
void commit() throws StorageException;
|
||||
|
||||
|
||||
@@ -29,12 +29,23 @@ import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
/**
|
||||
* The type Segmented key value storage adapter.
|
||||
*
|
||||
* @param <S> the type parameter
|
||||
*/
|
||||
public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStorage {
|
||||
|
||||
private final S segmentHandle;
|
||||
private final SegmentedKeyValueStorage<S> storage;
|
||||
private final Supplier<SnappedKeyValueStorage> snapshotSupplier;
|
||||
|
||||
/**
|
||||
* Instantiates a new Segmented key value storage adapter.
|
||||
*
|
||||
* @param segment the segment
|
||||
* @param storage the storage
|
||||
*/
|
||||
public SegmentedKeyValueStorageAdapter(
|
||||
final SegmentIdentifier segment, final SegmentedKeyValueStorage<S> storage) {
|
||||
this(
|
||||
@@ -45,6 +56,13 @@ public class SegmentedKeyValueStorageAdapter<S> implements SnappableKeyValueStor
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Segmented key value storage adapter.
|
||||
*
|
||||
* @param segment the segment
|
||||
* @param storage the storage
|
||||
* @param snapshotSupplier the snapshot supplier
|
||||
*/
|
||||
public SegmentedKeyValueStorageAdapter(
|
||||
final SegmentIdentifier segment,
|
||||
final SegmentedKeyValueStorage<S> storage,
|
||||
|
||||
@@ -19,12 +19,22 @@ import static com.google.common.base.Preconditions.checkState;
|
||||
import org.hyperledger.besu.plugin.services.exception.StorageException;
|
||||
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage.Transaction;
|
||||
|
||||
/**
|
||||
* The Segmented key value storage transaction transition validator decorator.
|
||||
*
|
||||
* @param <S> the type parameter
|
||||
*/
|
||||
public class SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<S>
|
||||
implements Transaction<S> {
|
||||
|
||||
private final Transaction<S> transaction;
|
||||
private boolean active = true;
|
||||
|
||||
/**
|
||||
* Instantiates a new Segmented key value storage transaction transition validator decorator.
|
||||
*
|
||||
* @param toDecorate the to decorate
|
||||
*/
|
||||
public SegmentedKeyValueStorageTransactionTransitionValidatorDecorator(
|
||||
final Transaction<S> toDecorate) {
|
||||
this.transaction = toDecorate;
|
||||
|
||||
@@ -21,6 +21,11 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* The Batching read pipe.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
|
||||
|
||||
private final ReadPipe<T> input;
|
||||
@@ -28,11 +33,26 @@ public class BatchingReadPipe<T> implements ReadPipe<List<T>> {
|
||||
private final Counter batchCounter;
|
||||
private final Function<List<T>, Integer> stopBatchCondition;
|
||||
|
||||
/**
|
||||
* Instantiates a new Batching read pipe.
|
||||
*
|
||||
* @param input the input
|
||||
* @param maximumBatchSize the maximum batch size
|
||||
* @param batchCounter the batch counter
|
||||
*/
|
||||
public BatchingReadPipe(
|
||||
final ReadPipe<T> input, final int maximumBatchSize, final Counter batchCounter) {
|
||||
this(input, maximumBatchSize, batchCounter, ts -> maximumBatchSize - ts.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Batching read pipe.
|
||||
*
|
||||
* @param input the input
|
||||
* @param maximumBatchSize the maximum batch size
|
||||
* @param batchCounter the batch counter
|
||||
* @param batchEndCondition the batch end condition
|
||||
*/
|
||||
public BatchingReadPipe(
|
||||
final ReadPipe<T> input,
|
||||
final int maximumBatchSize,
|
||||
|
||||
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
|
||||
* the pipeline aborted.
|
||||
*
|
||||
* <p>In most cases a Pipe is used through one of two narrower interfaces it supports {@link
|
||||
* ReadPipe} and {@link WritePipe}. These are designed to expose only the operations relevant to
|
||||
* ReadPipe}* and {@link WritePipe}. These are designed to expose only the operations relevant to
|
||||
* objects either reading from or publishing to the pipe respectively.
|
||||
*
|
||||
* @param <T> the type of item that flows through the pipe.
|
||||
@@ -45,6 +45,14 @@ public class Pipe<T> implements ReadPipe<T>, WritePipe<T> {
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final AtomicBoolean aborted = new AtomicBoolean();
|
||||
|
||||
/**
|
||||
* Instantiates a new Pipe.
|
||||
*
|
||||
* @param capacity the capacity
|
||||
* @param inputCounter the input counter
|
||||
* @param outputCounter the output counter
|
||||
* @param abortedItemCounter the aborted item counter
|
||||
*/
|
||||
public Pipe(
|
||||
final int capacity,
|
||||
final Counter inputCounter,
|
||||
|
||||
@@ -35,6 +35,11 @@ import io.opentelemetry.api.trace.Tracer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The Pipeline.
|
||||
*
|
||||
* @param <I> the type parameter
|
||||
*/
|
||||
public class Pipeline<I> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
|
||||
private final Pipe<I> inputPipe;
|
||||
@@ -58,6 +63,16 @@ public class Pipeline<I> {
|
||||
private final boolean tracingEnabled;
|
||||
private volatile List<Future<?>> futures;
|
||||
|
||||
/**
|
||||
* Instantiates a new Pipeline.
|
||||
*
|
||||
* @param inputPipe the input pipe
|
||||
* @param name the name
|
||||
* @param tracingEnabled the tracing enabled
|
||||
* @param stages the stages
|
||||
* @param pipes the pipes
|
||||
* @param completerStage the completer stage
|
||||
*/
|
||||
Pipeline(
|
||||
final Pipe<I> inputPipe,
|
||||
final String name,
|
||||
|
||||
@@ -54,6 +54,19 @@ public class PipelineBuilder<I, T> {
|
||||
private final boolean tracingEnabled;
|
||||
private final String pipelineName;
|
||||
|
||||
/**
|
||||
* Instantiates a new Pipeline builder.
|
||||
*
|
||||
* @param inputPipe the input pipe
|
||||
* @param stages the stages
|
||||
* @param pipes the pipes
|
||||
* @param lastStageName the last stage name
|
||||
* @param pipeEnd the pipe end
|
||||
* @param bufferSize the buffer size
|
||||
* @param outputCounter the output counter
|
||||
* @param tracingEnabled the tracing enabled
|
||||
* @param pipelineName the pipeline name
|
||||
*/
|
||||
public PipelineBuilder(
|
||||
final Pipe<I> inputPipe,
|
||||
final Collection<Stage> stages,
|
||||
@@ -81,12 +94,12 @@ public class PipelineBuilder<I, T> {
|
||||
* <i>source</i> returns <code>false</code> from {@link Iterator#hasNext()} and the last item has
|
||||
* been reached the end of the pipeline.
|
||||
*
|
||||
* @param <T> the type of items input into the pipeline.
|
||||
* @param sourceName the name of this stage. Used as the label for the output count metric.
|
||||
* @param source the source to pull items from for processing.
|
||||
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
|
||||
* @param itemCounter the counter to increment for each output of a stage. Must accept two labels,
|
||||
* the stage name and action (output or drained).
|
||||
* @param <T> the type of items input into the pipeline.
|
||||
* @param tracingEnabled whether this pipeline should be traced
|
||||
* @param pipelineName the name of the pipeline for tracing purposes
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
@@ -116,11 +129,11 @@ public class PipelineBuilder<I, T> {
|
||||
* Create a new pipeline that processes inputs added to <i>pipe</i>. The pipeline completes when
|
||||
* <i>pipe</i> is closed and the last item has been reached the end of the pipeline.
|
||||
*
|
||||
* @param <T> the type of items input into the pipeline.
|
||||
* @param sourceName the name of this stage. Used as the label for the output count metric.
|
||||
* @param bufferSize the number of items to be buffered between each stage in the pipeline.
|
||||
* @param outputCounter the counter to increment for each output of a stage. Must have a single
|
||||
* label which will be filled with the stage name.
|
||||
* @param <T> the type of items input into the pipeline.
|
||||
* @param tracingEnabled whether this pipeline should be traced
|
||||
* @param pipelineName the name of the pipeline for tracing purposes
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
@@ -148,9 +161,9 @@ public class PipelineBuilder<I, T> {
|
||||
* Adds a 1-to-1 processing stage to the pipeline. A single thread processes each item in the
|
||||
* pipeline with <i>processor</i> outputting its return value to the next stage.
|
||||
*
|
||||
* @param <O> the output type for this processing step.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param processor the processing to apply to each item.
|
||||
* @param <O> the output type for this processing step.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenProcess(
|
||||
@@ -165,10 +178,10 @@ public class PipelineBuilder<I, T> {
|
||||
*
|
||||
* <p>Note: The order of items is not preserved.
|
||||
*
|
||||
* @param <O> the output type for this processing step.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param processor the processing to apply to each item.
|
||||
* @param numberOfThreads the number of threads to use for processing.
|
||||
* @param <O> the output type for this processing step.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenProcessInParallel(
|
||||
@@ -188,10 +201,10 @@ public class PipelineBuilder<I, T> {
|
||||
*
|
||||
* <p>Note: The order of items is not preserved.
|
||||
*
|
||||
* @param <O> the output type for this processing step.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param processor the processing to apply to each item.
|
||||
* @param maxConcurrency the maximum number of items being processed concurrently.
|
||||
* @param <O> the output type for this processing step.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenProcessAsync(
|
||||
@@ -212,10 +225,10 @@ public class PipelineBuilder<I, T> {
|
||||
*
|
||||
* <p>Note: While processing may occur concurrently, order is preserved when results are output.
|
||||
*
|
||||
* @param <O> the output type for this processing step.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param processor the processing to apply to each item.
|
||||
* @param maxConcurrency the maximum number of items being processed concurrently.
|
||||
* @param <O> the output type for this processing step.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenProcessAsyncOrdered(
|
||||
@@ -292,10 +305,10 @@ public class PipelineBuilder<I, T> {
|
||||
*
|
||||
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
|
||||
*
|
||||
* @param <O> the type of items to be output from this stage.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param mapper the function to process each item with.
|
||||
* @param newBufferSize the output buffer size to use from this stage onwards.
|
||||
* @param <O> the type of items to be output from this stage.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenFlatMap(
|
||||
@@ -313,11 +326,11 @@ public class PipelineBuilder<I, T> {
|
||||
*
|
||||
* <pre>thenFlatMap(List::stream, newBufferSize)</pre>
|
||||
*
|
||||
* @param <O> the type of items to be output from this stage.
|
||||
* @param stageName the name of this stage. Used as the label for the output count metric.
|
||||
* @param mapper the function to process each item with.
|
||||
* @param numberOfThreads the number of threads to use for processing.
|
||||
* @param newBufferSize the output buffer size to use from this stage onwards.
|
||||
* @param <O> the type of items to be output from this stage.
|
||||
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
|
||||
*/
|
||||
public <O> PipelineBuilder<I, O> thenFlatMapInParallel(
|
||||
|
||||
@@ -14,6 +14,12 @@
|
||||
*/
|
||||
package org.hyperledger.besu.services.pipeline;
|
||||
|
||||
/** The interface Stage. */
|
||||
public interface Stage extends Runnable {
|
||||
/**
|
||||
* Gets name.
|
||||
*
|
||||
* @return the name
|
||||
*/
|
||||
String getName();
|
||||
}
|
||||
|
||||
@@ -20,9 +20,16 @@ import java.util.HashSet;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The Caching task collection.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public class CachingTaskCollection<T> implements TaskCollection<T> {
|
||||
|
||||
/** The constant DEFAULT_CACHE_SIZE. */
|
||||
public static final int DEFAULT_CACHE_SIZE = 1_000_000;
|
||||
|
||||
private final int maxCacheSize;
|
||||
|
||||
// The underlying collection
|
||||
@@ -36,11 +43,22 @@ public class CachingTaskCollection<T> implements TaskCollection<T> {
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
/**
|
||||
* Instantiates a new Caching task collection.
|
||||
*
|
||||
* @param collection the collection
|
||||
* @param maxCacheSize the max cache size
|
||||
*/
|
||||
public CachingTaskCollection(final TaskCollection<T> collection, final int maxCacheSize) {
|
||||
this.wrappedCollection = collection;
|
||||
this.maxCacheSize = maxCacheSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Caching task collection.
|
||||
*
|
||||
* @param collection the collection
|
||||
*/
|
||||
public CachingTaskCollection(final TaskCollection<T> collection) {
|
||||
this(collection, DEFAULT_CACHE_SIZE);
|
||||
}
|
||||
@@ -83,6 +101,11 @@ public class CachingTaskCollection<T> implements TaskCollection<T> {
|
||||
return wrappedCollection.size() + cache.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache size.
|
||||
*
|
||||
* @return the cache size
|
||||
*/
|
||||
public synchronized int cacheSize() {
|
||||
return outstandingTasks.size() + cache.size();
|
||||
}
|
||||
|
||||
@@ -22,6 +22,11 @@ import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* The InMemory task queue.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public class InMemoryTaskQueue<T> implements TaskCollection<T> {
|
||||
private final Queue<T> internalQueue = new ArrayDeque<>();
|
||||
private final Set<InMemoryTask<T>> unfinishedOutstandingTasks = new HashSet<>();
|
||||
@@ -65,6 +70,7 @@ public class InMemoryTaskQueue<T> implements TaskCollection<T> {
|
||||
internalQueue.clear();
|
||||
}
|
||||
|
||||
/** Clear internal queue. */
|
||||
public void clearInternalQueue() {
|
||||
internalQueue.clear();
|
||||
}
|
||||
@@ -87,6 +93,11 @@ public class InMemoryTaskQueue<T> implements TaskCollection<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return task queue as list.
|
||||
*
|
||||
* @return the list
|
||||
*/
|
||||
public synchronized List<T> asList() {
|
||||
return new ArrayList<>(internalQueue);
|
||||
}
|
||||
@@ -106,6 +117,12 @@ public class InMemoryTaskQueue<T> implements TaskCollection<T> {
|
||||
private final InMemoryTaskQueue<T> queue;
|
||||
private final AtomicBoolean completed = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* Instantiates a new InMemory task.
|
||||
*
|
||||
* @param queue the queue
|
||||
* @param data the data
|
||||
*/
|
||||
public InMemoryTask(final InMemoryTaskQueue<T> queue, final T data) {
|
||||
this.queue = queue;
|
||||
this.data = data;
|
||||
|
||||
@@ -24,16 +24,23 @@ import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
/**
|
||||
* The InMemory tasks priority queues.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
|
||||
implements TaskCollection<T> {
|
||||
private final List<PriorityQueue<T>> internalQueues = new ArrayList<>(16);
|
||||
private final Set<InMemoryTask<T>> unfinishedOutstandingTasks = new HashSet<>();
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
/** Instantiates a new InMemory tasks priority queues. */
|
||||
public InMemoryTasksPriorityQueues() {
|
||||
clearInternalQueues();
|
||||
}
|
||||
|
||||
/** Clear internal queues. */
|
||||
public synchronized void clearInternalQueues() {
|
||||
internalQueues.clear();
|
||||
for (int i = 0; i < 16; i++) {
|
||||
@@ -130,6 +137,12 @@ public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
|
||||
return unfinishedOutstandingTasks.remove(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if queue Contains the task .
|
||||
*
|
||||
* @param request the request
|
||||
* @return the boolean
|
||||
*/
|
||||
public synchronized boolean contains(final T request) {
|
||||
final PriorityQueue<T> queue = findQueue(request.getDepth());
|
||||
return queue.contains(request)
|
||||
@@ -143,6 +156,12 @@ public class InMemoryTasksPriorityQueues<T extends TasksPriorityProvider>
|
||||
private final InMemoryTasksPriorityQueues<T> queue;
|
||||
private final AtomicBoolean completed = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* Instantiates a new InMemory task.
|
||||
*
|
||||
* @param queue the queue
|
||||
* @param data the data
|
||||
*/
|
||||
public InMemoryTask(final InMemoryTasksPriorityQueues<T> queue, final T data) {
|
||||
this.queue = queue;
|
||||
this.data = data;
|
||||
|
||||
@@ -14,7 +14,17 @@
|
||||
*/
|
||||
package org.hyperledger.besu.services.tasks;
|
||||
|
||||
/**
|
||||
* The interface Task.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public interface Task<T> {
|
||||
/**
|
||||
* Gets data.
|
||||
*
|
||||
* @return the data
|
||||
*/
|
||||
T getData();
|
||||
|
||||
/** Mark this task as completed. */
|
||||
|
||||
@@ -16,6 +16,11 @@ package org.hyperledger.besu.services.tasks;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* The interface Task collection.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public interface TaskCollection<T> extends Closeable {
|
||||
/**
|
||||
* Add some data that needs to be processed.
|
||||
@@ -26,7 +31,7 @@ public interface TaskCollection<T> extends Closeable {
|
||||
|
||||
/**
|
||||
* Get a task for processing. This task will be tracked as a pending task until either {@code
|
||||
* Task.markCompleted} or {@code Task.requeue} is called.
|
||||
* Task.markCompleted}* or {@code Task.requeue} is called.
|
||||
*
|
||||
* @return The task to be processed.
|
||||
*/
|
||||
|
||||
@@ -17,8 +17,19 @@
|
||||
|
||||
package org.hyperledger.besu.services.tasks;
|
||||
|
||||
/** The interface Tasks priority provider. */
|
||||
public interface TasksPriorityProvider {
|
||||
/**
|
||||
* Gets priority.
|
||||
*
|
||||
* @return the priority
|
||||
*/
|
||||
long getPriority();
|
||||
|
||||
/**
|
||||
* Gets depth.
|
||||
*
|
||||
* @return the depth
|
||||
*/
|
||||
int getDepth();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user