Don't delete completed tasks from RocksDbTaskQueue (#1099)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
This commit is contained in:
Adrian Sutton
2019-03-15 06:27:35 +10:00
committed by GitHub
parent b5b4f4e85e
commit 6e8fa9ecc9
3 changed files with 5 additions and 95 deletions

View File

@@ -180,11 +180,6 @@ class FastSynchronizer<C> {
"Pending request cache size for fast sync world state download",
taskCollection::cacheSize);
// We're using the CachingTaskCollection which isn't designed to reliably persist all
// added tasks. We therefore can't resume from previously added tasks.
// So for now, clear tasks when we start up.
taskCollection.clear();
return taskCollection;
}
}

View File

@@ -19,7 +19,6 @@ import tech.pegasys.pantheon.services.util.RocksDbUtil;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,6 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
private long lastEnqueuedKey = 0;
private long lastDequeuedKey = 0;
private long oldestKey = 0;
private RocksIterator dequeueIterator;
private long lastValidKeyFromIterator;
private final Set<RocksDbTask<T>> outstandingTasks = new HashSet<>();
@@ -60,7 +58,9 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
this.deserializer = deserializer;
try {
RocksDbUtil.loadNativeLibrary();
options = new Options().setCreateIfMissing(true);
// We don't support reloading data so ensure we're starting from a clean slate.
RocksDB.destroyDB(storageDirectory.toString(), new Options());
options = new Options().setCreateIfMissing(true).setErrorIfExists(true);
db = RocksDB.open(options, storageDirectory.toString());
enqueueLatency =
@@ -74,29 +74,11 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
"dequeue_latency_seconds",
"Latency for dequeuing an item.");
// Initialize queue from existing db
initializeQueue();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
private void initializeQueue() {
RocksIterator iter = db.newIterator();
iter.seekToFirst();
if (!iter.isValid()) {
// There is no data yet, nothing to do
return;
}
long firstKey = Longs.fromByteArray(iter.key());
iter.seekToLast();
long lastKey = Longs.fromByteArray(iter.key());
lastDequeuedKey = firstKey - 1;
oldestKey = firstKey;
lastEnqueuedKey = lastKey;
}
public static <T> RocksDbTaskQueue<T> create(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
@@ -167,7 +149,7 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
public synchronized void clear() {
assertNotClosed();
outstandingTasks.clear();
final byte[] from = Longs.toByteArray(oldestKey);
final byte[] from = Longs.toByteArray(0);
final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1);
try {
db.deleteRange(from, to);
@@ -177,7 +159,6 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
}
lastDequeuedKey = 0;
lastEnqueuedKey = 0;
oldestKey = 0;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
@@ -188,26 +169,6 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
return isEmpty() && outstandingTasks.isEmpty();
}
private synchronized void deleteCompletedTasks() {
final long oldestOutstandingKey =
outstandingTasks.stream()
.min(Comparator.comparingLong(RocksDbTask::getKey))
.map(RocksDbTask::getKey)
.orElse(lastDequeuedKey + 1);
if (oldestKey < oldestOutstandingKey) {
// Delete all contiguous completed tasks
final byte[] fromKey = Longs.toByteArray(oldestKey);
final byte[] toKey = Longs.toByteArray(oldestOutstandingKey);
try {
db.deleteRange(fromKey, toKey);
oldestKey = oldestOutstandingKey;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
}
@Override
public synchronized void close() {
if (closed) {
@@ -228,11 +189,7 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {
}
private synchronized boolean markTaskCompleted(final RocksDbTask<T> task) {
if (outstandingTasks.remove(task)) {
deleteCompletedTasks();
return true;
}
return false;
return outstandingTasks.remove(task);
}
private synchronized void handleFailedTask(final RocksDbTask<T> task) {

View File

@@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.services.tasks;
import static org.assertj.core.api.Assertions.assertThat;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;
@@ -22,7 +20,6 @@ import java.nio.file.Path;
import java.util.function.Function;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue<BytesValue>> {
@@ -39,43 +36,4 @@ public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue
return RocksDbTaskQueue.create(
dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem());
}
@Test
public void shouldResumeFromExistingQueue() throws Exception {
testResumeFromExistingQueue(10);
}
@Test
public void shouldResumeFromExistingQueueWithOneElement() throws Exception {
testResumeFromExistingQueue(1);
}
@Test
public void shouldResumeFromExistingQueueWithNoElements() throws Exception {
testResumeFromExistingQueue(0);
}
private void testResumeFromExistingQueue(final int elementCount) throws Exception {
final Path dataDir = folder.newFolder().toPath();
try (final RocksDbTaskQueue<BytesValue> queue = createQueue(dataDir)) {
for (int i = 0; i < elementCount; i++) {
queue.add(BytesValue.of(i));
}
}
try (final RocksDbTaskQueue<BytesValue> resumedQueue = createQueue(dataDir)) {
assertThat(resumedQueue.size()).isEqualTo(elementCount);
// Queue an additional element
resumedQueue.add(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(elementCount + 1);
// Check that everything dequeues in order as expected
for (int i = 0; i < elementCount; i++) {
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i));
}
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(0);
}
}
}