Merge branch 'master' into table-wrappers

Conflicts:
	src/main/java/edu/jhuapl/tinkerpop/AccumuloGraph.java
This commit is contained in:
Michael Lieberman
2014-12-24 11:43:26 -05:00
7 changed files with 291 additions and 175 deletions

View File

@@ -148,12 +148,12 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
BatchWriter vertexBW;
BatchWriter edgeBW;
ElementCache<Vertex> vertexCache;
ElementCache<Edge> edgeCache;
VertexTableWrapper vertexWrapper;
EdgeTableWrapper edgeWrapper;
LruElementCache<Vertex> vertexCache;
LruElementCache<Edge> edgeCache;
public AccumuloGraph(Configuration cfg) {
this(new AccumuloGraphConfiguration(cfg));
}
@@ -167,11 +167,13 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
config.validate();
this.config = config;
if (config.getLruCacheEnabled()) {
vertexCache = new LruElementCache<Vertex>(config.getLruMaxCapacity(),
if (config.getVertexCacheEnabled()) {
vertexCache = new ElementCache<Vertex>(config.getVertexCacheSize(),
config.getVertexCacheTimeout());
}
edgeCache = new LruElementCache<Edge>(config.getLruMaxCapacity(),
if (config.getEdgeCacheEnabled()) {
edgeCache = new ElementCache<Edge>(config.getEdgeCacheSize(),
config.getEdgeCacheTimeout());
}

View File

@@ -111,11 +111,12 @@ public class AccumuloGraphConfiguration implements Serializable {
public static final String SPLITS = "blueprints.accumulo.splits";
public static final String COLVIS = "blueprints.accumulo.columnVisibility";
public static final String SKIP_CHECKS = "blueprints.accumulo.skipExistenceChecks";
public static final String LRU_MAX_CAP = "blueprints.accumulo.lruMaximumCapacity";
public static final String PRELOAD_PROPERTIES = "blueprints.accumulo.property.preload";
public static final String EDGE_CACHE_TIMEOUT = "blueprints.accumulo.edgeCacheTimeout";
public static final String PROPERTY_CACHE_TIMEOUT = "blueprints.accumulo.propertyCacheTimeout";
public static final String EDGE_CACHE_SIZE = "blueprints.accumulo.edgeCacheSize";
public static final String EDGE_CACHE_TIMEOUT = "blueprints.accumulo.edgeCacheTimeout";
public static final String VERTEX_CACHE_TIMEOUT = "blueprints.accumulo.vertexCacheTimeout";
public static final String VERTEX_CACHE_SIZE = "blueprints.accumulo.vertexCacheSize";
public static final String PRELOAD_EDGES = "blueprints.accumulo.edge.preload";
public static final String AUTO_INDEX = "blueprints.accumulo.index.auto";
public static final String DISABLE_INDEX = "blueprints.accumulo.index.disable";
@@ -423,13 +424,27 @@ public class AccumuloGraphConfiguration implements Serializable {
return this;
}
/**
* Whether the vertex cache is enabled (i.e., both
* size and timeout are positive).
* @return
*/
public boolean getVertexCacheEnabled() {
return getVertexCacheSize() > 0 && getVertexCacheTimeout() > 0;
}
public int getVertexCacheSize() {
return conf.getInt(Keys.VERTEX_CACHE_SIZE, -1);
}
public int getVertexCacheTimeout() {
return conf.getInt(Keys.VERTEX_CACHE_TIMEOUT, 30000);
return conf.getInt(Keys.VERTEX_CACHE_TIMEOUT, -1);
}
/**
* Sets the number of milliseconds since retrieval that a Vertex instance will be maintained
* in a RAM cache before that value is expired. If this value is
* in a RAM cache before that value is expired. Also set the maximum
* size of the cache. If these values are
* unset or set to 0 (or a negative number) no caching will be performed.
* <P>
* A round-trip to Accumulo to retrieve a Vertex is an expensive operation.
@@ -441,27 +456,53 @@ public class AccumuloGraphConfiguration implements Serializable {
* <P>
* The default is unset (no caching).
*
* @param size
* maximum size of the cache
* @param millis
* the maximum number of milliseconds a Vertex should be held in RAM
* @return
*/
public AccumuloGraphConfiguration setVertexCacheTimeout(int millis) {
public AccumuloGraphConfiguration setVertexCacheParams(int size, int millis) {
if ((size <= 0 || millis <= 0) && (size > 0 || millis > 0)) {
throw new IllegalArgumentException("Parameters must be both non-positive or both positive");
}
if (size <= 0) {
conf.clearProperty(Keys.VERTEX_CACHE_SIZE);
} else {
conf.setProperty(Keys.VERTEX_CACHE_SIZE, size);
}
if (millis <= 0) {
conf.clearProperty(Keys.VERTEX_CACHE_TIMEOUT);
} else {
conf.setProperty(Keys.VERTEX_CACHE_TIMEOUT, millis);
}
return this;
}
/**
* Whether the edge cache is enabled (i.e., both
* size and timeout are positive).
* @return
*/
public boolean getEdgeCacheEnabled() {
return getEdgeCacheSize() > 0 && getEdgeCacheTimeout() > 0;
}
public int getEdgeCacheSize() {
return conf.getInt(Keys.EDGE_CACHE_SIZE, -1);
}
public int getEdgeCacheTimeout() {
return conf.getInt(Keys.EDGE_CACHE_TIMEOUT, 30000);
return conf.getInt(Keys.EDGE_CACHE_TIMEOUT, -1);
}
/**
* Sets the number of milliseconds since retrieval that an Edge instance will be maintained
* in a RAM cache before that value is expired. If this value is
* unset or set to 0 (or a negative number) no caching will be performed.
* in a RAM cache before that value is expired. Also set the maximum size of the cache.
* If these values are unset or set to 0 (or a negative number) no caching will be performed.
* <P>
* A round-trip to Accumulo to retrieve an Edge is an expensive operation.
* Setting this value to a positive number allows the AccumuloGraph to cache retrieved
@@ -472,16 +513,29 @@ public class AccumuloGraphConfiguration implements Serializable {
* <P>
* The default is unset (no caching).
*
* @param size
* maximum size of the cache
* @param millis
* the maximum number of milliseconds an Edge should be held in RAM
* @return
*/
public AccumuloGraphConfiguration setEdgeCacheTimeout(int millis) {
public AccumuloGraphConfiguration setEdgeCacheParams(int size, int millis) {
if ((size <= 0 || millis <= 0) && (size > 0 || millis > 0)) {
throw new IllegalArgumentException("Parameters must be both non-positive or both positive");
}
if (size <= 0) {
conf.clearProperty(Keys.EDGE_CACHE_SIZE);
} else {
conf.setProperty(Keys.EDGE_CACHE_SIZE, size);
}
if (millis <= 0) {
conf.clearProperty(Keys.EDGE_CACHE_TIMEOUT);
} else {
conf.setProperty(Keys.EDGE_CACHE_TIMEOUT, millis);
}
return this;
}
@@ -517,35 +571,6 @@ public class AccumuloGraphConfiguration implements Serializable {
return this;
}
/**
* True if the LRU cache is enabled (i.e., LRU max capacity is positive).
* See {@link #setLruMaxCapacity(int)}.
* @return
*/
public boolean getLruCacheEnabled() {
return getLruMaxCapacity() > 0;
}
public int getLruMaxCapacity() {
return conf.getInt(Keys.LRU_MAX_CAP, -1);
}
/**
* The Graph can use a least-recently used (LRU) cache to avoid
* round-trip checks to Accumulo at the cost of consistency. Set this
* value to the maximum number of vertices or edges to be cached.
* A negative number means do not cache any values.<p/>
*
* TODO This should probably be time-based.
*
* @param maxSize
* @return
*/
public AccumuloGraphConfiguration setLruMaxCapacity(int max) {
conf.setProperty(Keys.LRU_MAX_CAP, max);
return this;
}
public boolean getAutoIndex() {
Object bool = conf.getProperty(Keys.AUTO_INDEX);
if (bool == null)
@@ -1081,14 +1106,6 @@ public class AccumuloGraphConfiguration implements Serializable {
return getGraphName();
}
/**
* @deprecated This is an old method name. Use {@link #getLruCacheEnabled()}.
* @return
*/
public boolean useLruCache() {
return getLruCacheEnabled();
}
/**
* @deprecated This is an old method name. Use {@link #getPreloadedEdgeLabels()}.
* @return

View File

@@ -0,0 +1,50 @@
/******************************************************************************
* COPYRIGHT NOTICE *
* Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory *
* All rights reserved. *
* *
* This material may only be used, modified, or reproduced by or for the *
* U.S. Government pursuant to the license rights granted under FAR clause *
* 52.227-14 or DFARS clauses 252.227-7013/7014. *
* *
* For any other permissions, please contact the Legal Office at JHU/APL. *
******************************************************************************/
package edu.jhuapl.tinkerpop;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.tinkerpop.blueprints.Element;
/**
* Simple cache for retrieved graph elements,
* backed by Guava's cache implementation.
*/
public class ElementCache<T extends Element> {
private Cache<Object, T> cache;
public ElementCache(int size, int timeout) {
cache = CacheBuilder.newBuilder()
.maximumSize(size)
.expireAfterAccess(timeout, TimeUnit.MILLISECONDS)
.build();
}
public void cache(T element) {
cache.put(element.getId(), element);
}
public T retrieve(Object id) {
return cache.getIfPresent(id);
}
public void remove(Object id) {
cache.invalidate(id);
}
public void clear() {
cache.invalidateAll();
}
}

View File

@@ -1,122 +0,0 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.LinkedHashMap;
import java.util.Map;
import com.tinkerpop.blueprints.Element;
class LruElementCache<T extends Element> extends LinkedHashMap<String,LruElementCache<T>.Entry> implements Runnable {
private static final long serialVersionUID = 1435352624360026357L;
ReferenceQueue<T> queue;
Integer maxCapacity;
int timeout;
public LruElementCache(int timeout) {
super(32, .75f, true);
this.maxCapacity = null;
init(timeout);
}
public LruElementCache(int maxCapacity, int timeout) {
super(maxCapacity + 1, 1f, true);
this.maxCapacity = maxCapacity;
init(timeout);
}
private void init(int timeout) {
this.timeout = timeout;
this.queue = new ReferenceQueue<T>();
Thread t = new Thread(this, "lru-cache-reaper-" + System.identityHashCode(this));
t.setPriority(Thread.MIN_PRIORITY);
t.setDaemon(true);
t.start();
}
@Override
protected boolean removeEldestEntry(Map.Entry<String,LruElementCache<T>.Entry> eldest) {
if (maxCapacity != null) {
return size() > maxCapacity;
} else {
// no cap on size, but see if eldest has timed out or been gc'ed
// already...
return eldest.getValue().getElement() == null;
}
}
@Override
public synchronized Entry remove(Object id) {
return super.remove(id);
}
public synchronized void cache(T element) {
Entry entry = new Entry(element, queue, timeout);
put(element.getId().toString(), entry);
}
public synchronized T retrieve(String id) {
LruElementCache<T>.Entry entry = get(id);
if (entry == null) {
// not cached...
return null;
}
T element = (T) entry.getElement();
if (element == null) {
// gc'ed or timed out; either way...
remove(id);
}
return element;
}
@Override
public void run() {
while (true) {
try {
Entry entry = (Entry) queue.remove();
synchronized (this) {
remove(entry.id);
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
final class Entry extends SoftReference<T> {
String id;
long timeout;
public Entry(T element, ReferenceQueue<T> queue, int timeout) {
super(element, queue);
this.id = element.getId().toString();
this.timeout = System.currentTimeMillis() + timeout;
}
public T getElement() {
if (System.currentTimeMillis() <= timeout) {
return get();
} else {
return null;
}
}
}
}

View File

@@ -152,4 +152,67 @@ public class AccumuloGraphConfigurationTest {
cfg.print();
}
@Test
public void testInvalidCacheParams() throws Exception {
int size = 100;
int timeout = 30000;
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils
.generateGraphConfig("cacheParams");
cfg.validate();
// Vertex cache.
assertFalse(cfg.getVertexCacheEnabled());
try {
cfg.setVertexCacheParams(-1, timeout);
fail();
} catch (Exception e) { }
try {
cfg.setVertexCacheParams(size, -1);
fail();
} catch (Exception e) { }
assertFalse(cfg.getVertexCacheEnabled());
cfg.setVertexCacheParams(size, timeout);
cfg.validate();
assertTrue(cfg.getVertexCacheEnabled());
assertEquals(size, cfg.getVertexCacheSize());
assertEquals(timeout, cfg.getVertexCacheTimeout());
cfg.setVertexCacheParams(-1, -1);
cfg.validate();
assertFalse(cfg.getVertexCacheEnabled());
// Edge cache.
assertFalse(cfg.getEdgeCacheEnabled());
try {
cfg.setEdgeCacheParams(-1, timeout);
fail();
} catch (Exception e) { }
try {
cfg.setEdgeCacheParams(size, -1);
fail();
} catch (Exception e) { }
assertFalse(cfg.getEdgeCacheEnabled());
cfg.setEdgeCacheParams(size, timeout);
cfg.validate();
assertTrue(cfg.getEdgeCacheEnabled());
assertEquals(size, cfg.getEdgeCacheSize());
assertEquals(timeout, cfg.getEdgeCacheTimeout());
cfg.setEdgeCacheParams(-1, -1);
cfg.validate();
assertFalse(cfg.getEdgeCacheEnabled());
}
}

View File

@@ -0,0 +1,103 @@
/******************************************************************************
* COPYRIGHT NOTICE *
* Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory *
* All rights reserved. *
* *
* This material may only be used, modified, or reproduced by or for the *
* U.S. Government pursuant to the license rights granted under FAR clause *
* 52.227-14 or DFARS clauses 252.227-7013/7014. *
* *
* For any other permissions, please contact the Legal Office at JHU/APL. *
******************************************************************************/
package edu.jhuapl.tinkerpop;
import org.junit.Test;
import static org.junit.Assert.*;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.Vertex;
public class ElementCacheTest {
@Test
public void testElementCacheSize() throws Exception {
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils
.generateGraphConfig("elementCacheSize");
Graph graph = GraphFactory.open(cfg.getConfiguration());
Vertex[] verts = new Vertex[10];
for (int i = 0; i < verts.length; i++) {
verts[i] = graph.addVertex(i);
}
Edge[] edges = new Edge[9];
for (int i = 0; i < edges.length; i++) {
edges[i] = graph.addEdge(null,
verts[0], verts[i+1], "edge");
}
sizeTests(verts);
sizeTests(edges);
graph.shutdown();
}
private void sizeTests(Element[] elts) {
ElementCache<Element> cache =
new ElementCache<Element>(3, 120000);
for (Element e : elts) {
cache.cache(e);
}
for (Element e : elts) {
cache.cache(e);
}
int total = 0;
for (Element e : elts) {
if (cache.retrieve(e.getId()) != null) {
total++;
}
}
assertTrue(total < elts.length);
cache.clear();
for (Element e : elts) {
assertNull(cache.retrieve(e.getId()));
}
}
@Test
public void testElementCacheTimeout() throws Exception {
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils
.generateGraphConfig("elementCacheTimeout");
Graph graph = GraphFactory.open(cfg.getConfiguration());
ElementCache<Element> cache =
new ElementCache<Element>(10, 2000);
Vertex v1 = graph.addVertex(1);
Vertex v2 = graph.addVertex(2);
assertNull(cache.retrieve(1));
assertNull(cache.retrieve(2));
cache.cache(v1);
assertNotNull(cache.retrieve(v1.getId()));
Thread.sleep(3000);
assertNull(cache.retrieve(v1.getId()));
Edge e = graph.addEdge(null, v1, v2, "label");
assertNull(cache.retrieve(e.getId()));
cache.cache(e);
assertNotNull(cache.retrieve(e.getId()));
Thread.sleep(3000);
assertNull(cache.retrieve(e.getId()));
graph.shutdown();
}
}

View File

@@ -7,7 +7,10 @@ public class ExtendedAccumuloGraphTest extends AccumuloGraphTest {
@Override
public Graph generateGraph(String graphDirectoryName) {
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils.generateGraphConfig(graphDirectoryName);
cfg.setLruMaxCapacity(20).setPreloadedProperties(new String[] {"name"}).setPreloadedEdgeLabels(new String[] {"knows"}).setPropertyCacheTimeout("name",100000);
cfg.setEdgeCacheParams(20, 30000)
.setPreloadedProperties(new String[] {"name"})
.setPreloadedEdgeLabels(new String[] {"knows"})
.setPropertyCacheTimeout("name",100000);
testGraphName.set(graphDirectoryName);
return GraphFactory.open(cfg.getConfiguration());
}