mirror of
https://github.com/vacp2p/linea-besu.git
synced 2026-01-09 15:37:54 -05:00
NC-1721: Filter timeout if not queried for 10 minutes (#66)
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
This commit is contained in:
@@ -14,6 +14,7 @@ import tech.pegasys.pantheon.ethereum.db.DefaultMutableBlockchain;
|
||||
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
|
||||
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
|
||||
@@ -66,7 +67,8 @@ public class JsonRpcTestMethodsFactory {
|
||||
final P2PNetwork peerDiscovery = mock(P2PNetwork.class);
|
||||
final TransactionPool transactionPool = mock(TransactionPool.class);
|
||||
final FilterManager filterManager =
|
||||
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
|
||||
new FilterManager(
|
||||
blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository());
|
||||
final EthHashMiningCoordinator miningCoordinator = mock(EthHashMiningCoordinator.class);
|
||||
|
||||
return new JsonRpcMethodsFactory()
|
||||
|
||||
@@ -25,6 +25,7 @@ import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.EthGetFilterChanges;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.JsonRpcParameter;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
|
||||
@@ -81,7 +82,9 @@ public class EthGetFilterChangesIntegrationTest {
|
||||
transactions, genesisConfig.getProtocolSchedule(), protocolContext, batchAddedListener);
|
||||
final BlockchainQueries blockchainQueries =
|
||||
new BlockchainQueries(blockchain, worldStateArchive);
|
||||
filterManager = new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
|
||||
filterManager =
|
||||
new FilterManager(
|
||||
blockchainQueries, transactionPool, new FilterIdGenerator(), new FilterRepository());
|
||||
method = new EthGetFilterChanges(filterManager, parameters);
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import tech.pegasys.pantheon.ethereum.core.Synchronizer;
|
||||
import tech.pegasys.pantheon.ethereum.core.TransactionPool;
|
||||
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.AdminPeers;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.DebugStorageRangeAt;
|
||||
@@ -85,11 +84,10 @@ public class JsonRpcMethodsFactory {
|
||||
final ProtocolSchedule<?> protocolSchedule,
|
||||
final AbstractMiningCoordinator<?, ?> miningCoordinator,
|
||||
final Set<Capability> supportedCapabilities,
|
||||
final Collection<RpcApis> rpcApis) {
|
||||
final Collection<RpcApis> rpcApis,
|
||||
final FilterManager filterManager) {
|
||||
final BlockchainQueries blockchainQueries =
|
||||
new BlockchainQueries(blockchain, worldStateArchive);
|
||||
final FilterManager filterManager =
|
||||
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
|
||||
return methods(
|
||||
clientVersion,
|
||||
chainId,
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.Hash;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/** Tracks new blocks being added to the blockchain. */
|
||||
class BlockFilter extends Filter {
|
||||
|
||||
private final List<Hash> blockHashes = new ArrayList<>();
|
||||
|
||||
BlockFilter(final String id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
void addBlockHash(final Hash hash) {
|
||||
blockHashes.add(hash);
|
||||
}
|
||||
|
||||
List<Hash> blockHashes() {
|
||||
return blockHashes;
|
||||
}
|
||||
|
||||
void clearBlockHashes() {
|
||||
blockHashes.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
abstract class Filter {
|
||||
|
||||
private static final Duration DEFAULT_EXPIRE_DURATION = Duration.ofMinutes(10);
|
||||
|
||||
private final String id;
|
||||
private Instant expireTime;
|
||||
|
||||
Filter(final String id) {
|
||||
this.id = id;
|
||||
resetExpireTime();
|
||||
}
|
||||
|
||||
String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
void resetExpireTime() {
|
||||
this.expireTime = Instant.now().plus(DEFAULT_EXPIRE_DURATION);
|
||||
}
|
||||
|
||||
boolean isExpired() {
|
||||
return Instant.now().isAfter(expireTime);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setExpireTime(final Instant expireTime) {
|
||||
this.expireTime = expireTime;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Instant getExpireTime() {
|
||||
return expireTime;
|
||||
}
|
||||
}
|
||||
@@ -12,118 +12,52 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
|
||||
/** Manages JSON-RPC filter events. */
|
||||
public class FilterManager {
|
||||
public class FilterManager extends AbstractVerticle {
|
||||
|
||||
private final Map<String, BlockFilter> blockFilters = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, PendingTransactionFilter> pendingTransactionFilters =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, LogFilter> logFilters = new ConcurrentHashMap<>();
|
||||
private static final int FILTER_TIMEOUT_CHECK_TIMER = 10000;
|
||||
|
||||
private final FilterIdGenerator filterIdGenerator;
|
||||
|
||||
/** Tracks new blocks being added to the blockchain. */
|
||||
private static class BlockFilter {
|
||||
|
||||
private final List<Hash> blockHashes = new ArrayList<>();
|
||||
|
||||
BlockFilter() {}
|
||||
|
||||
void addBlockHash(final Hash hash) {
|
||||
blockHashes.add(hash);
|
||||
}
|
||||
|
||||
List<Hash> blockHashes() {
|
||||
return blockHashes;
|
||||
}
|
||||
|
||||
void clearBlockHashes() {
|
||||
blockHashes.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/** Tracks new pending transactions that have arrived in the transaction pool */
|
||||
private static class PendingTransactionFilter {
|
||||
|
||||
private final List<Hash> transactionHashes = new ArrayList<>();
|
||||
|
||||
PendingTransactionFilter() {}
|
||||
|
||||
void addTransactionHash(final Hash hash) {
|
||||
transactionHashes.add(hash);
|
||||
}
|
||||
|
||||
List<Hash> transactionHashes() {
|
||||
return transactionHashes;
|
||||
}
|
||||
|
||||
void clearTransactionHashes() {
|
||||
transactionHashes.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/** Tracks new log events. */
|
||||
private static class LogFilter {
|
||||
|
||||
private final BlockParameter fromBlock;
|
||||
private final BlockParameter toBlock;
|
||||
private final LogsQuery logsQuery;
|
||||
|
||||
private final List<LogWithMetadata> logs = new ArrayList<>();
|
||||
|
||||
LogFilter(
|
||||
final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) {
|
||||
this.fromBlock = fromBlock;
|
||||
this.toBlock = toBlock;
|
||||
this.logsQuery = logsQuery;
|
||||
}
|
||||
|
||||
public BlockParameter getFromBlock() {
|
||||
return fromBlock;
|
||||
}
|
||||
|
||||
public BlockParameter getToBlock() {
|
||||
return toBlock;
|
||||
}
|
||||
|
||||
public LogsQuery getLogsQuery() {
|
||||
return logsQuery;
|
||||
}
|
||||
|
||||
void addLog(final List<LogWithMetadata> logs) {
|
||||
this.logs.addAll(logs);
|
||||
}
|
||||
|
||||
List<LogWithMetadata> logs() {
|
||||
return logs;
|
||||
}
|
||||
|
||||
void clearLogs() {
|
||||
logs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private final FilterRepository filterRepository;
|
||||
private final BlockchainQueries blockchainQueries;
|
||||
|
||||
public FilterManager(
|
||||
final BlockchainQueries blockchainQueries,
|
||||
final TransactionPool transactionPool,
|
||||
final FilterIdGenerator filterIdGenerator) {
|
||||
final FilterIdGenerator filterIdGenerator,
|
||||
final FilterRepository filterRepository) {
|
||||
this.filterIdGenerator = filterIdGenerator;
|
||||
this.filterRepository = filterRepository;
|
||||
checkNotNull(blockchainQueries.getBlockchain());
|
||||
blockchainQueries.getBlockchain().observeBlockAdded(this::recordBlockEvent);
|
||||
transactionPool.addTransactionListener(this::recordPendingTransactionEvent);
|
||||
this.blockchainQueries = blockchainQueries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startFilterTimeoutTimer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
filterRepository.deleteAll();
|
||||
}
|
||||
|
||||
private void startFilterTimeoutTimer() {
|
||||
vertx.setPeriodic(
|
||||
FILTER_TIMEOUT_CHECK_TIMER,
|
||||
timerId ->
|
||||
vertx.executeBlocking(
|
||||
future -> new FilterTimeoutMonitor(filterRepository).checkFilters(), result -> {}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Installs a new block filter
|
||||
*
|
||||
@@ -131,7 +65,7 @@ public class FilterManager {
|
||||
*/
|
||||
public String installBlockFilter() {
|
||||
final String filterId = filterIdGenerator.nextId();
|
||||
blockFilters.put(filterId, new BlockFilter());
|
||||
filterRepository.save(new BlockFilter(filterId));
|
||||
return filterId;
|
||||
}
|
||||
|
||||
@@ -142,7 +76,7 @@ public class FilterManager {
|
||||
*/
|
||||
public String installPendingTransactionFilter() {
|
||||
final String filterId = filterIdGenerator.nextId();
|
||||
pendingTransactionFilters.put(filterId, new PendingTransactionFilter());
|
||||
filterRepository.save(new PendingTransactionFilter(filterId));
|
||||
return filterId;
|
||||
}
|
||||
|
||||
@@ -157,7 +91,7 @@ public class FilterManager {
|
||||
public String installLogFilter(
|
||||
final BlockParameter fromBlock, final BlockParameter toBlock, final LogsQuery logsQuery) {
|
||||
final String filterId = filterIdGenerator.nextId();
|
||||
logFilters.put(filterId, new LogFilter(fromBlock, toBlock, logsQuery));
|
||||
filterRepository.save(new LogFilter(filterId, fromBlock, toBlock, logsQuery));
|
||||
return filterId;
|
||||
}
|
||||
|
||||
@@ -168,15 +102,19 @@ public class FilterManager {
|
||||
* @return {@code true} if the filter was successfully removed; otherwise {@code false}
|
||||
*/
|
||||
public boolean uninstallFilter(final String filterId) {
|
||||
return blockFilters.remove(filterId) != null
|
||||
|| pendingTransactionFilters.remove(filterId) != null
|
||||
|| logFilters.remove(filterId) != null;
|
||||
if (filterRepository.exists(filterId)) {
|
||||
filterRepository.delete(filterId);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void recordBlockEvent(final BlockAddedEvent event, final Blockchain blockchain) {
|
||||
final Hash blockHash = event.getBlock().getHash();
|
||||
Collection<BlockFilter> blockFilters = filterRepository.getFiltersOfType(BlockFilter.class);
|
||||
blockFilters.forEach(
|
||||
(filterId, filter) -> {
|
||||
(filter) -> {
|
||||
synchronized (filter) {
|
||||
filter.addBlockHash(blockHash);
|
||||
}
|
||||
@@ -186,8 +124,9 @@ public class FilterManager {
|
||||
}
|
||||
|
||||
private void checkBlockchainForMatchingLogsForFilters() {
|
||||
Collection<LogFilter> logFilters = filterRepository.getFiltersOfType(LogFilter.class);
|
||||
logFilters.forEach(
|
||||
(filterId, filter) -> {
|
||||
(filter) -> {
|
||||
final long headBlockNumber = blockchainQueries.headBlockNumber();
|
||||
final long toBlockNumber =
|
||||
filter.getToBlock().getNumber().orElse(blockchainQueries.headBlockNumber());
|
||||
@@ -199,12 +138,14 @@ public class FilterManager {
|
||||
|
||||
@VisibleForTesting
|
||||
void recordPendingTransactionEvent(final Transaction transaction) {
|
||||
Collection<PendingTransactionFilter> pendingTransactionFilters =
|
||||
filterRepository.getFiltersOfType(PendingTransactionFilter.class);
|
||||
if (pendingTransactionFilters.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
pendingTransactionFilters.forEach(
|
||||
(filterId, filter) -> {
|
||||
(filter) -> {
|
||||
synchronized (filter) {
|
||||
filter.addTransactionHash(transaction.hash());
|
||||
}
|
||||
@@ -218,7 +159,7 @@ public class FilterManager {
|
||||
* @return the new block hashes that have occurred since the filter was last checked
|
||||
*/
|
||||
public List<Hash> blockChanges(final String filterId) {
|
||||
final BlockFilter filter = blockFilters.get(filterId);
|
||||
final BlockFilter filter = filterRepository.getFilter(filterId, BlockFilter.class).orElse(null);
|
||||
if (filter == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -227,6 +168,7 @@ public class FilterManager {
|
||||
synchronized (filter) {
|
||||
hashes = new ArrayList<>(filter.blockHashes());
|
||||
filter.clearBlockHashes();
|
||||
filter.resetExpireTime();
|
||||
}
|
||||
return hashes;
|
||||
}
|
||||
@@ -238,7 +180,8 @@ public class FilterManager {
|
||||
* @return the new pending transaction hashes that have occurred since the filter was last checked
|
||||
*/
|
||||
public List<Hash> pendingTransactionChanges(final String filterId) {
|
||||
final PendingTransactionFilter filter = pendingTransactionFilters.get(filterId);
|
||||
final PendingTransactionFilter filter =
|
||||
filterRepository.getFilter(filterId, PendingTransactionFilter.class).orElse(null);
|
||||
if (filter == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -247,12 +190,13 @@ public class FilterManager {
|
||||
synchronized (filter) {
|
||||
hashes = new ArrayList<>(filter.transactionHashes());
|
||||
filter.clearTransactionHashes();
|
||||
filter.resetExpireTime();
|
||||
}
|
||||
return hashes;
|
||||
}
|
||||
|
||||
public List<LogWithMetadata> logsChanges(final String filterId) {
|
||||
final LogFilter filter = logFilters.get(filterId);
|
||||
final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
|
||||
if (filter == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -261,14 +205,17 @@ public class FilterManager {
|
||||
synchronized (filter) {
|
||||
logs = new ArrayList<>(filter.logs());
|
||||
filter.clearLogs();
|
||||
filter.resetExpireTime();
|
||||
}
|
||||
return logs;
|
||||
}
|
||||
|
||||
public List<LogWithMetadata> logs(final String filterId) {
|
||||
final LogFilter filter = logFilters.get(filterId);
|
||||
final LogFilter filter = filterRepository.getFilter(filterId, LogFilter.class).orElse(null);
|
||||
if (filter == null) {
|
||||
return null;
|
||||
} else {
|
||||
filter.resetExpireTime();
|
||||
}
|
||||
|
||||
final long fromBlockNumber =
|
||||
@@ -278,19 +225,4 @@ public class FilterManager {
|
||||
|
||||
return blockchainQueries.matchingLogs(fromBlockNumber, toBlockNumber, filter.getLogsQuery());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int blockFilterCount() {
|
||||
return blockFilters.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int pendingTransactionFilterCount() {
|
||||
return pendingTransactionFilters.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int logFilterCount() {
|
||||
return logFilters.size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class FilterRepository {
|
||||
|
||||
private final Map<String, Filter> filters = new ConcurrentHashMap<>();
|
||||
|
||||
public FilterRepository() {}
|
||||
|
||||
Collection<Filter> getFilters() {
|
||||
return new ArrayList<>(filters.values());
|
||||
}
|
||||
|
||||
<T extends Filter> Collection<T> getFiltersOfType(final Class<T> filterClass) {
|
||||
return filters
|
||||
.values()
|
||||
.stream()
|
||||
.flatMap(f -> getIfTypeMatches(f, filterClass).map(Stream::of).orElseGet(Stream::empty))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
<T extends Filter> Optional<T> getFilter(final String filterId, final Class<T> filterClass) {
|
||||
final Filter filter = filters.get(filterId);
|
||||
return getIfTypeMatches(filter, filterClass);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T extends Filter> Optional<T> getIfTypeMatches(
|
||||
final Filter filter, final Class<T> filterClass) {
|
||||
if (filter == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (!filterClass.isAssignableFrom(filter.getClass())) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of((T) filter);
|
||||
}
|
||||
|
||||
boolean exists(final String id) {
|
||||
return filters.containsKey(id);
|
||||
}
|
||||
|
||||
void save(final Filter filter) {
|
||||
if (filter == null) {
|
||||
throw new IllegalArgumentException("Can't save null filter");
|
||||
}
|
||||
|
||||
if (exists(filter.getId())) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Filter with id %s already exists", filter.getId()));
|
||||
}
|
||||
|
||||
filters.put(filter.getId(), filter);
|
||||
}
|
||||
|
||||
void delete(final String id) {
|
||||
filters.remove(id);
|
||||
}
|
||||
|
||||
void deleteAll() {
|
||||
filters.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
class FilterTimeoutMonitor {
|
||||
|
||||
private final FilterRepository filterRepository;
|
||||
|
||||
FilterTimeoutMonitor(final FilterRepository filterRepository) {
|
||||
this.filterRepository = filterRepository;
|
||||
}
|
||||
|
||||
void checkFilters() {
|
||||
filterRepository
|
||||
.getFilters()
|
||||
.forEach(
|
||||
filter -> {
|
||||
if (filter.isExpired()) {
|
||||
filterRepository.delete(filter.getId());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.parameters.BlockParameter;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.LogWithMetadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
class LogFilter extends Filter {
|
||||
|
||||
private final BlockParameter fromBlock;
|
||||
private final BlockParameter toBlock;
|
||||
private final LogsQuery logsQuery;
|
||||
|
||||
private final List<LogWithMetadata> logs = new ArrayList<>();
|
||||
|
||||
LogFilter(
|
||||
final String id,
|
||||
final BlockParameter fromBlock,
|
||||
final BlockParameter toBlock,
|
||||
final LogsQuery logsQuery) {
|
||||
super(id);
|
||||
this.fromBlock = fromBlock;
|
||||
this.toBlock = toBlock;
|
||||
this.logsQuery = logsQuery;
|
||||
}
|
||||
|
||||
public BlockParameter getFromBlock() {
|
||||
return fromBlock;
|
||||
}
|
||||
|
||||
public BlockParameter getToBlock() {
|
||||
return toBlock;
|
||||
}
|
||||
|
||||
public LogsQuery getLogsQuery() {
|
||||
return logsQuery;
|
||||
}
|
||||
|
||||
void addLog(final List<LogWithMetadata> logs) {
|
||||
this.logs.addAll(logs);
|
||||
}
|
||||
|
||||
List<LogWithMetadata> logs() {
|
||||
return logs;
|
||||
}
|
||||
|
||||
void clearLogs() {
|
||||
logs.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.Hash;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/** Tracks new pending transactions that have arrived in the transaction pool */
|
||||
class PendingTransactionFilter extends Filter {
|
||||
|
||||
private final List<Hash> transactionHashes = new ArrayList<>();
|
||||
|
||||
PendingTransactionFilter(final String id) {
|
||||
super(id);
|
||||
}
|
||||
|
||||
void addTransactionHash(final Hash hash) {
|
||||
transactionHashes.add(hash);
|
||||
}
|
||||
|
||||
List<Hash> transactionHashes() {
|
||||
return transactionHashes;
|
||||
}
|
||||
|
||||
void clearTransactionHashes() {
|
||||
transactionHashes.clear();
|
||||
}
|
||||
}
|
||||
@@ -25,9 +25,6 @@ import org.apache.logging.log4j.Logger;
|
||||
/**
|
||||
* The SubscriptionManager is responsible for managing subscriptions and sending messages to the
|
||||
* clients that have an active subscription subscription.
|
||||
*
|
||||
* <p>TODO: The logic to send a notification to a client that has an active subscription TODO:
|
||||
* handle connection close (remove subscriptions)
|
||||
*/
|
||||
public class SubscriptionManager extends AbstractVerticle {
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
|
||||
import tech.pegasys.pantheon.ethereum.mainnet.HeaderValidationMode;
|
||||
@@ -146,8 +147,11 @@ public abstract class AbstractEthJsonRpcHttpServiceTest {
|
||||
|
||||
final BlockchainQueries blockchainQueries = new BlockchainQueries(blockchain, stateArchive);
|
||||
final FilterIdGenerator filterIdGenerator = mock(FilterIdGenerator.class);
|
||||
final FilterRepository filterRepository = new FilterRepository();
|
||||
when(filterIdGenerator.nextId()).thenReturn("0x1");
|
||||
filterManager = new FilterManager(blockchainQueries, transactionPoolMock, filterIdGenerator);
|
||||
filterManager =
|
||||
new FilterManager(
|
||||
blockchainQueries, transactionPoolMock, filterIdGenerator, filterRepository);
|
||||
|
||||
final Set<Capability> supportedCapabilities = new HashSet<>();
|
||||
supportedCapabilities.add(EthProtocol.ETH62);
|
||||
|
||||
@@ -5,6 +5,8 @@ import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.refEq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
@@ -22,12 +24,14 @@ import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
|
||||
import tech.pegasys.pantheon.util.bytes.BytesValue;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@@ -38,23 +42,23 @@ public class FilterManagerLogFilterTest {
|
||||
@Mock private Blockchain blockchain;
|
||||
@Mock private BlockchainQueries blockchainQueries;
|
||||
@Mock private TransactionPool transactionPool;
|
||||
@Spy private final FilterRepository filterRepository = new FilterRepository();
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
when(blockchainQueries.getBlockchain()).thenReturn(blockchain);
|
||||
this.filterManager =
|
||||
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
|
||||
new FilterManager(
|
||||
blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void installUninstallNewLogFilter() {
|
||||
assertThat(filterManager.logFilterCount()).isEqualTo(0);
|
||||
|
||||
final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
|
||||
assertThat(filterManager.logFilterCount()).isEqualTo(1);
|
||||
assertThat(filterRepository.exists(filterId)).isTrue();
|
||||
|
||||
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
|
||||
assertThat(filterManager.logFilterCount()).isEqualTo(0);
|
||||
assertThat(filterRepository.exists(filterId)).isFalse();
|
||||
|
||||
assertThat(filterManager.blockChanges(filterId)).isNull();
|
||||
}
|
||||
@@ -149,6 +153,26 @@ public class FilterManagerLogFilterTest {
|
||||
assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getLogsChangesShouldResetFilterExpireDate() {
|
||||
LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery()));
|
||||
doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class));
|
||||
|
||||
filterManager.logsChanges("foo");
|
||||
|
||||
verify(filter).resetExpireTime();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getLogsShouldResetFilterExpireDate() {
|
||||
LogFilter filter = spy(new LogFilter("foo", latest(), latest(), logsQuery()));
|
||||
doReturn(Optional.of(filter)).when(filterRepository).getFilter(eq("foo"), eq(LogFilter.class));
|
||||
|
||||
filterManager.logs("foo");
|
||||
|
||||
verify(filter).resetExpireTime();
|
||||
}
|
||||
|
||||
private LogWithMetadata logWithMetadata() {
|
||||
return LogWithMetadata.create(
|
||||
0,
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
|
||||
@@ -13,12 +18,14 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries
|
||||
import tech.pegasys.pantheon.ethereum.testutil.BlockDataGenerator;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@@ -31,6 +38,7 @@ public class FilterManagerTest {
|
||||
@Mock private Blockchain blockchain;
|
||||
@Mock private BlockchainQueries blockchainQueries;
|
||||
@Mock private TransactionPool transactionPool;
|
||||
@Spy final FilterRepository filterRepository = new FilterRepository();
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
@@ -38,7 +46,8 @@ public class FilterManagerTest {
|
||||
this.blockGenerator = new BlockDataGenerator();
|
||||
this.currentBlock = blockGenerator.genesisBlock();
|
||||
this.filterManager =
|
||||
new FilterManager(blockchainQueries, transactionPool, new FilterIdGenerator());
|
||||
new FilterManager(
|
||||
blockchainQueries, transactionPool, new FilterIdGenerator(), filterRepository);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -48,13 +57,11 @@ public class FilterManagerTest {
|
||||
|
||||
@Test
|
||||
public void installUninstallNewBlockFilter() {
|
||||
assertThat(filterManager.blockFilterCount()).isEqualTo(0);
|
||||
|
||||
final String filterId = filterManager.installBlockFilter();
|
||||
assertThat(filterManager.blockFilterCount()).isEqualTo(1);
|
||||
assertThat(filterRepository.exists(filterId)).isTrue();
|
||||
|
||||
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
|
||||
assertThat(filterManager.blockFilterCount()).isEqualTo(0);
|
||||
assertThat(filterRepository.exists(filterId)).isFalse();
|
||||
|
||||
assertThat(filterManager.blockChanges(filterId)).isNull();
|
||||
}
|
||||
@@ -117,15 +124,11 @@ public class FilterManagerTest {
|
||||
|
||||
@Test
|
||||
public void installUninstallPendingTransactionFilter() {
|
||||
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0);
|
||||
|
||||
final String filterId = filterManager.installPendingTransactionFilter();
|
||||
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(1);
|
||||
verify(filterRepository).save(any(Filter.class));
|
||||
|
||||
assertThat(filterManager.uninstallFilter(filterId)).isTrue();
|
||||
assertThat(filterManager.pendingTransactionFilterCount()).isEqualTo(0);
|
||||
|
||||
assertThat(filterManager.pendingTransactionChanges(filterId)).isNull();
|
||||
verify(filterRepository).delete(eq(filterId));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -184,6 +187,30 @@ public class FilterManagerTest {
|
||||
assertThat(filterManager.pendingTransactionChanges(filterId2)).isEqualTo(expectedHashes2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getBlockChangesShouldResetFilterExpireDate() {
|
||||
BlockFilter filter = spy(new BlockFilter("foo"));
|
||||
doReturn(Optional.of(filter))
|
||||
.when(filterRepository)
|
||||
.getFilter(eq("foo"), eq(BlockFilter.class));
|
||||
|
||||
filterManager.blockChanges("foo");
|
||||
|
||||
verify(filter).resetExpireTime();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getPendingTransactionsChangesShouldResetFilterExpireDate() {
|
||||
PendingTransactionFilter filter = spy(new PendingTransactionFilter("foo"));
|
||||
doReturn(Optional.of(filter))
|
||||
.when(filterRepository)
|
||||
.getFilter(eq("foo"), eq(PendingTransactionFilter.class));
|
||||
|
||||
filterManager.pendingTransactionChanges("foo");
|
||||
|
||||
verify(filter).resetExpireTime();
|
||||
}
|
||||
|
||||
private Hash appendBlockToBlockchain() {
|
||||
final long blockNumber = currentBlock.getHeader().getNumber() + 1;
|
||||
final Hash parentHash = currentBlock.getHash();
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.catchThrowable;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.assertj.core.util.Lists;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FilterRepositoryTest {
|
||||
|
||||
private FilterRepository repository;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
repository = new FilterRepository();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getFiltersShouldReturnAllFilters() {
|
||||
BlockFilter filter1 = new BlockFilter("foo");
|
||||
BlockFilter filter2 = new BlockFilter("bar");
|
||||
repository.save(filter1);
|
||||
repository.save(filter2);
|
||||
|
||||
Collection<Filter> filters = repository.getFilters();
|
||||
|
||||
assertThat(filters).containsExactlyInAnyOrderElementsOf(Lists.newArrayList(filter1, filter2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getFiltersShouldReturnEmptyListWhenRepositoryIsEmpty() {
|
||||
assertThat(repository.getFilters()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveShouldAddFilterToRepository() {
|
||||
BlockFilter filter = new BlockFilter("id");
|
||||
repository.save(filter);
|
||||
|
||||
BlockFilter retrievedFilter = repository.getFilter("id", BlockFilter.class).get();
|
||||
|
||||
assertThat(retrievedFilter).isEqualToComparingFieldByField(filter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveNullFilterShouldFail() {
|
||||
Throwable throwable = catchThrowable(() -> repository.save(null));
|
||||
|
||||
assertThat(throwable)
|
||||
.isInstanceOf(IllegalArgumentException.class)
|
||||
.hasMessage("Can't save null filter");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveFilterWithSameIdShouldFail() {
|
||||
BlockFilter filter = new BlockFilter("x");
|
||||
repository.save(filter);
|
||||
|
||||
Throwable throwable = catchThrowable(() -> repository.save(filter));
|
||||
|
||||
assertThat(throwable)
|
||||
.isInstanceOf(IllegalArgumentException.class)
|
||||
.hasMessage("Filter with id x already exists");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleFilterShouldReturnExistingFilterOfCorrectType() {
|
||||
BlockFilter filter = new BlockFilter("id");
|
||||
repository.save(filter);
|
||||
|
||||
Optional<BlockFilter> optional = repository.getFilter(filter.getId(), BlockFilter.class);
|
||||
|
||||
assertThat(optional.isPresent()).isTrue();
|
||||
assertThat(optional.get()).isEqualToComparingFieldByField(filter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleFilterShouldReturnEmptyForFilterOfIncorrectType() {
|
||||
BlockFilter filter = new BlockFilter("id");
|
||||
repository.save(filter);
|
||||
|
||||
Optional<PendingTransactionFilter> optional =
|
||||
repository.getFilter(filter.getId(), PendingTransactionFilter.class);
|
||||
|
||||
assertThat(optional.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleFilterShouldReturnEmptyForAbsentId() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
repository.save(filter);
|
||||
|
||||
Optional<BlockFilter> optional = repository.getFilter("bar", BlockFilter.class);
|
||||
|
||||
assertThat(optional.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleFilterShouldReturnEmptyForEmptyRepository() {
|
||||
Optional<BlockFilter> optional = repository.getFilter("id", BlockFilter.class);
|
||||
|
||||
assertThat(optional.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getFilterCollectionShouldReturnAllFiltersOfSpecificType() {
|
||||
BlockFilter blockFilter1 = new BlockFilter("foo");
|
||||
BlockFilter blockFilter2 = new BlockFilter("biz");
|
||||
PendingTransactionFilter pendingTxFilter1 = new PendingTransactionFilter("bar");
|
||||
|
||||
Collection<BlockFilter> expectedFilters = Lists.newArrayList(blockFilter1, blockFilter2);
|
||||
|
||||
repository.save(blockFilter1);
|
||||
repository.save(blockFilter2);
|
||||
repository.save(pendingTxFilter1);
|
||||
|
||||
Collection<BlockFilter> blockFilters = repository.getFiltersOfType(BlockFilter.class);
|
||||
|
||||
assertThat(blockFilters).containsExactlyInAnyOrderElementsOf(expectedFilters);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getFilterCollectionShouldReturnEmptyForNoneMatchingTypes() {
|
||||
PendingTransactionFilter filter = new PendingTransactionFilter("foo");
|
||||
repository.save(filter);
|
||||
|
||||
Collection<BlockFilter> filters = repository.getFiltersOfType(BlockFilter.class);
|
||||
|
||||
assertThat(filters).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getFilterCollectionShouldReturnEmptyListForEmptyRepository() {
|
||||
Collection<BlockFilter> filters = repository.getFiltersOfType(BlockFilter.class);
|
||||
|
||||
assertThat(filters).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void existsShouldReturnTrueForExistingId() {
|
||||
BlockFilter filter = new BlockFilter("id");
|
||||
repository.save(filter);
|
||||
|
||||
assertThat(repository.exists("id")).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void existsShouldReturnFalseForAbsentId() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
repository.save(filter);
|
||||
|
||||
assertThat(repository.exists("bar")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void existsShouldReturnFalseForEmptyRepository() {
|
||||
assertThat(repository.exists("id")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteExistingFilterShouldDeleteSuccessfully() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
repository.save(filter);
|
||||
repository.delete(filter.getId());
|
||||
|
||||
assertThat(repository.exists(filter.getId())).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteAbsentFilterDoesNothing() {
|
||||
assertThat(repository.exists("foo")).isFalse();
|
||||
repository.delete("foo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteAllShouldClearFilters() {
|
||||
BlockFilter filter1 = new BlockFilter("foo");
|
||||
BlockFilter filter2 = new BlockFilter("biz");
|
||||
repository.save(filter1);
|
||||
repository.save(filter2);
|
||||
|
||||
repository.deleteAll();
|
||||
|
||||
assertThat(repository.exists(filter1.getId())).isFalse();
|
||||
assertThat(repository.exists(filter2.getId())).isFalse();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class FilterTest {
|
||||
|
||||
@Test
|
||||
public void filterJustCreatedShouldNotBeExpired() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
|
||||
assertThat(filter.isExpired()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void isExpiredShouldReturnTrueForExpiredFilter() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
filter.setExpireTime(Instant.now().minusSeconds(1));
|
||||
|
||||
assertThat(filter.isExpired()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resetExpireDateShouldIncrementExpireDate() {
|
||||
BlockFilter filter = new BlockFilter("foo");
|
||||
filter.setExpireTime(Instant.now().minus(Duration.ofDays(1)));
|
||||
filter.resetExpireTime();
|
||||
|
||||
assertThat(filter.getExpireTime())
|
||||
.isBeforeOrEqualTo(Instant.now().plus(Duration.ofMinutes(10)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class FilterTimeoutMonitorTest {
|
||||
|
||||
@Mock private FilterRepository filterRepository;
|
||||
|
||||
private FilterTimeoutMonitor timeoutMonitor;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
timeoutMonitor = new FilterTimeoutMonitor(filterRepository);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void expiredFilterShouldBeDeleted() {
|
||||
Filter filter = spy(new BlockFilter("foo"));
|
||||
when(filter.isExpired()).thenReturn(true);
|
||||
when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter));
|
||||
|
||||
timeoutMonitor.checkFilters();
|
||||
|
||||
verify(filterRepository).getFilters();
|
||||
verify(filterRepository).delete("foo");
|
||||
verifyNoMoreInteractions(filterRepository);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nonExpiredFilterShouldNotBeDeleted() {
|
||||
Filter filter = mock(Filter.class);
|
||||
when(filter.isExpired()).thenReturn(false);
|
||||
when(filterRepository.getFilters()).thenReturn(Lists.newArrayList(filter));
|
||||
|
||||
timeoutMonitor.checkFilters();
|
||||
|
||||
verify(filter).isExpired();
|
||||
verifyNoMoreInteractions(filter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkEmptyFilterRepositoryDoesNothing() {
|
||||
when(filterRepository.getFilters()).thenReturn(Collections.emptyList());
|
||||
|
||||
timeoutMonitor.checkFilters();
|
||||
|
||||
verify(filterRepository).getFilters();
|
||||
verifyNoMoreInteractions(filterRepository);
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,9 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcConfiguration.RpcApis;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcHttpService;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.JsonRpcMethodsFactory;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterIdGenerator;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterManager;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.filter.FilterRepository;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
|
||||
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketConfiguration;
|
||||
@@ -127,6 +130,8 @@ public class RunnerBuilder {
|
||||
final AbstractMiningCoordinator<?, ?> miningCoordinator =
|
||||
pantheonController.getMiningCoordinator();
|
||||
|
||||
final FilterManager filterManager = createFilterManager(vertx, context, transactionPool);
|
||||
|
||||
Optional<JsonRpcHttpService> jsonRpcHttpService = Optional.empty();
|
||||
if (jsonRpcConfiguration.isEnabled()) {
|
||||
final Map<String, JsonRpcMethod> jsonRpcMethods =
|
||||
@@ -139,7 +144,8 @@ public class RunnerBuilder {
|
||||
transactionPool,
|
||||
miningCoordinator,
|
||||
supportedCapabilities,
|
||||
jsonRpcConfiguration.getRpcApis());
|
||||
jsonRpcConfiguration.getRpcApis(),
|
||||
filterManager);
|
||||
jsonRpcHttpService =
|
||||
Optional.of(new JsonRpcHttpService(vertx, jsonRpcConfiguration, jsonRpcMethods));
|
||||
}
|
||||
@@ -156,7 +162,8 @@ public class RunnerBuilder {
|
||||
transactionPool,
|
||||
miningCoordinator,
|
||||
supportedCapabilities,
|
||||
webSocketConfiguration.getRpcApis());
|
||||
webSocketConfiguration.getRpcApis(),
|
||||
filterManager);
|
||||
|
||||
final SubscriptionManager subscriptionManager =
|
||||
createSubscriptionManager(vertx, context.getBlockchain(), transactionPool);
|
||||
@@ -179,6 +186,18 @@ public class RunnerBuilder {
|
||||
vertx, networkRunner, jsonRpcHttpService, webSocketService, pantheonController, dataDir);
|
||||
}
|
||||
|
||||
private FilterManager createFilterManager(
|
||||
final Vertx vertx, final ProtocolContext<?> context, final TransactionPool transactionPool) {
|
||||
FilterManager filterManager =
|
||||
new FilterManager(
|
||||
new BlockchainQueries(context.getBlockchain(), context.getWorldStateArchive()),
|
||||
transactionPool,
|
||||
new FilterIdGenerator(),
|
||||
new FilterRepository());
|
||||
vertx.deployVerticle(filterManager);
|
||||
return filterManager;
|
||||
}
|
||||
|
||||
private Map<String, JsonRpcMethod> jsonRpcMethods(
|
||||
final ProtocolContext<?> context,
|
||||
final ProtocolSchedule<?> protocolSchedule,
|
||||
@@ -188,7 +207,8 @@ public class RunnerBuilder {
|
||||
final TransactionPool transactionPool,
|
||||
final AbstractMiningCoordinator<?, ?> miningCoordinator,
|
||||
final Set<Capability> supportedCapabilities,
|
||||
final Collection<RpcApis> jsonRpcApis) {
|
||||
final Collection<RpcApis> jsonRpcApis,
|
||||
final FilterManager filterManager) {
|
||||
final Map<String, JsonRpcMethod> methods =
|
||||
new JsonRpcMethodsFactory()
|
||||
.methods(
|
||||
@@ -202,7 +222,8 @@ public class RunnerBuilder {
|
||||
protocolSchedule,
|
||||
miningCoordinator,
|
||||
supportedCapabilities,
|
||||
jsonRpcApis);
|
||||
jsonRpcApis,
|
||||
filterManager);
|
||||
|
||||
if (context.getConsensusState() instanceof CliqueContext) {
|
||||
// This is checked before entering this if branch
|
||||
|
||||
Reference in New Issue
Block a user