Property updates

Moved removeProperty to AccumuloElement
Added property cache unit tests
Moved flushing to GlobalInstances
Cruft cleanup

NOTE: Unit tests failing due to what appears to be race condition
This commit is contained in:
Michael Lieberman
2014-12-30 19:21:25 -05:00
parent 06bb292b42
commit 2d7a0d4c22
8 changed files with 310 additions and 72 deletions

View File

@@ -16,6 +16,7 @@ package edu.jhuapl.tinkerpop;
import java.util.Map;
import java.util.Set;
import com.tinkerpop.blueprints.Element;
public abstract class AccumuloElement implements Element {
@@ -78,16 +79,27 @@ public abstract class AccumuloElement implements Element {
@Override
public void setProperty(String key, Object value) {
globals.getGraph().checkProperty(key, value);
makeCache();
globals.getGraph().setProperty(type, this, key, value);
propertyCache.put(key, value);
globals.getElementWrapper(type).writeProperty(this, key, value);
globals.getGraph().setPropertyForIndexes(type, this, key, value);
}
@Override
public <T> T removeProperty(String key) {
T old = getProperty(key);
makeCache();
propertyCache.remove(key);
return globals.getGraph().removeProperty(type, this, key);
globals.getElementWrapper(type).clearProperty(this, key);
globals.getGraph().removePropertyFromIndex(type, this, key, old);
return old;
}
@Override
@@ -113,6 +125,14 @@ public abstract class AccumuloElement implements Element {
return getClass().hashCode() ^ id.hashCode();
}
/**
* Internal method for unit tests.
* @return
*/
PropertyCache getPropertyCache() {
return propertyCache;
}
/**
* @deprecated This is used in {@link AccumuloGraph} but needs to go away.
* @param key

View File

@@ -315,7 +315,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
vert = new AccumuloVertex(globals, myID);
vertexWrapper.writeVertex(vert);
checkedFlush();
globals.checkedFlush();
caches.cache(vert, Vertex.class);
@@ -402,7 +402,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
}
}
checkedFlush();
globals.checkedFlush();
scan.close();
// If Edges are found, delete the whole row
@@ -574,7 +574,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
edgeWrapper.writeEdge(edge);
vertexWrapper.writeEdgeEndpoints(edge);
checkedFlush();
globals.checkedFlush();
caches.cache(edge, Edge.class);
@@ -674,7 +674,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
globals.getVertexWrapper().deleteEdgeEndpoints(edge);
globals.getEdgeWrapper().deleteEdge(edge);
checkedFlush();
globals.checkedFlush();
edgedeleter = config.getConnector().createBatchDeleter(config.getVertexTableName(), config.getAuthorizations(), config.getQueryThreads(),
config.getBatchWriterConfig());
Mutators.deleteElementRanges(edgedeleter, edge);
@@ -830,53 +830,9 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
}
public void flush() {
try {
writer.flush();
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
}
private void checkedFlush() {
if (config.getAutoFlush()) {
flush();
}
}
// methods used by AccumuloElement, AccumuloVertex, AccumuloEdge to interact
// with the backing Accumulo data store...
void preloadProperties(AccumuloElement element, Class<? extends Element> type) {
String[] toPreload = config.getPreloadedProperties();
if (toPreload == null) {
return;
}
Scanner s = getElementScanner(type);
s.setRange(new Range(element.getId().toString()));
// user has requested specific properties...
Text colf = new Text("");
for (String key : toPreload) {
if (StringFactory.LABEL.equals(key)) {
colf.set(AccumuloGraph.LABEL);
} else {
colf.set(key);
}
s.fetchColumnFamily(colf);
}
Iterator<Entry<Key, Value>> iter = s.iterator();
// Integer timeout = config.getPropertyCacheTimeoutMillis(); // Change this
while (iter.hasNext()) {
Entry<Key,Value> entry = iter.next();
Object val = AccumuloByteSerializer.deserialize(entry.getValue().get());
element.cacheProperty(entry.getKey().getColumnFamily().toString(), val);
}
s.close();
}
/**
* Sets the property. Requires a round-trip to Accumulo to see if the property exists
* iff the provided key has an index. Therefore, for best performance, if at
@@ -887,13 +843,12 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
* @param key
* @param val
*/
void setProperty(Class<? extends Element> type, Element element, String key, Object val) {
checkProperty(key, val);
void setPropertyForIndexes(Class<? extends Element> type, Element element, String key, Object val) {
try {
byte[] newByteVal = AccumuloByteSerializer.serialize(val);
Mutation m = null;
if (config.getAutoIndex() || getIndexedKeys(type).contains(key)) {
byte[] newByteVal = AccumuloByteSerializer.serialize(val);
Mutation m = null;
BatchWriter bw = getIndexBatchWriter(type);
Object old = element.getProperty(key);
if (old != null) {
@@ -902,15 +857,12 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
m.putDelete(key, element.getId().toString());
bw.addMutation(m);
}
m = new Mutation(newByteVal);
m.put(key.getBytes(), element.getId().toString().getBytes(), EMPTY);
bw.addMutation(m);
checkedFlush();
globals.checkedFlush();
}
getElementTableWrapper(type).writeProperty(element, key, val);
checkedFlush();
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
@@ -926,26 +878,23 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
return getVertexIndexWriter();
}
<T> T removeProperty(Class<? extends Element> type, Element element, String key) {
void removePropertyFromIndex(Class<? extends Element> type, Element element,
String key, Object value) {
if (StringFactory.LABEL.equals(key) || SLABEL.equals(key)) {
throw new AccumuloGraphException("Cannot remove the " + StringFactory.LABEL + " property.");
}
T obj = element.getProperty(key);
try {
if (obj != null) {
getElementTableWrapper(type).clearProperty(element, key);
byte[] val = AccumuloByteSerializer.serialize(obj);
if (value != null) {
byte[] val = AccumuloByteSerializer.serialize(value);
Mutation m = new Mutation(val);
m.putDelete(key, element.getId().toString());
getIndexBatchWriter(type).addMutation(m);
checkedFlush();
globals.checkedFlush();
}
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
return obj;
}
Vertex getEdgeVertex(String edgeId, Direction direction) {
@@ -985,7 +934,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
// internal methods used by this class
private void checkProperty(String key, Object val) {
void checkProperty(String key, Object val) {
nullCheckProperty(key, val);
if (key.equals(StringFactory.ID)) {
throw ExceptionFactory.propertyKeyIdIsReserved();
@@ -1134,7 +1083,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
if (bd != null)
bd.close();
}
checkedFlush();
globals.checkedFlush();
}
@Override
@@ -1152,7 +1101,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
checkedFlush();
globals.checkedFlush();
// Re Index Graph
BatchScanner scan = getElementBatchScanner(elementClass);
try {
@@ -1177,7 +1126,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
} finally {
scan.close();
}
checkedFlush();
globals.checkedFlush();
}

View File

@@ -12,6 +12,7 @@
package edu.jhuapl.tinkerpop;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
@@ -77,6 +78,19 @@ public class GlobalInstances {
return caches;
}
/**
* Flush the writer, if autoflush is enabled.
*/
public void checkedFlush() {
if (config.getAutoFlush()) {
try {
mtbw.flush();
} catch (MutationsRejectedException e) {
throw new AccumuloGraphException(e);
}
}
}
/**
* TODO: Refactor these away when the {@link #graph} member is gone.
* @param vertexWrapper

View File

@@ -13,6 +13,7 @@ package edu.jhuapl.tinkerpop;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Cache for storing element properties.
@@ -31,6 +32,14 @@ public class PropertyCache {
this.values = new HashMap<String, TimedValue>();
}
public boolean containsKey(String key) {
return values.containsKey(key);
}
public Set<String> keySet() {
return values.keySet();
}
public void put(String key, Object value) {
Integer timeout = getTimeout(key);
@@ -74,6 +83,11 @@ public class PropertyCache {
values.clear();
}
@Override
public String toString() {
return values.toString();
}
/**
* Return the timeout for the given key.
* Checks for a key-specific timeout
@@ -109,5 +123,10 @@ public class PropertyCache {
public Long getExpiry() {
return expiry;
}
@Override
public String toString() {
return "[" + value + ", " + expiry + "]";
}
}
}

View File

@@ -38,9 +38,11 @@ public class EdgeTableWrapper extends ElementTableWrapper {
*/
public void writeEdge(Edge edge) {
Mutators.apply(getWriter(), new EdgeMutator.Add(edge));
globals.checkedFlush();
}
public void deleteEdge(Edge edge) {
Mutators.apply(getWriter(), new EdgeMutator.Delete(edge));
globals.checkedFlush();
}
}

View File

@@ -171,6 +171,7 @@ public abstract class ElementTableWrapper extends BaseTableWrapper {
*/
public void clearProperty(Element element, String key) {
Mutators.apply(getWriter(), new ClearPropertyMutator(element, key));
globals.checkedFlush();
}
/**
@@ -182,6 +183,7 @@ public abstract class ElementTableWrapper extends BaseTableWrapper {
public void writeProperty(Element element, String key, Object value) {
Mutators.apply(getWriter(),
new WritePropertyMutator(element, key, value));
globals.checkedFlush();
}
/**

View File

@@ -49,6 +49,7 @@ public class VertexTableWrapper extends ElementTableWrapper {
*/
public void writeVertex(Vertex vertex) {
Mutators.apply(getWriter(), new AddVertexMutator(vertex));
globals.checkedFlush();
}
/**
@@ -57,10 +58,12 @@ public class VertexTableWrapper extends ElementTableWrapper {
*/
public void writeEdgeEndpoints(Edge edge) {
Mutators.apply(getWriter(), new EdgeEndpointsMutator.Add(edge));
globals.checkedFlush();
}
public void deleteEdgeEndpoints(Edge edge) {
Mutators.apply(getWriter(), new EdgeEndpointsMutator.Delete(edge));
globals.checkedFlush();
}
public Iterable<Edge> getEdges(Vertex vertex, Direction direction,

View File

@@ -0,0 +1,229 @@
/******************************************************************************
* COPYRIGHT NOTICE *
* Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory *
* All rights reserved. *
* *
* This material may only be used, modified, or reproduced by or for the *
* U.S. Government pursuant to the license rights granted under FAR clause *
* 52.227-14 or DFARS clauses 252.227-7013/7014. *
* *
* For any other permissions, please contact the Legal Office at JHU/APL. *
******************************************************************************/
package edu.jhuapl.tinkerpop;
import static org.junit.Assert.*;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
/**
* Tests related to {@link Element}-based property caching.
*/
public class ElementPropertyCachingTest {
private static final int TIMEOUT = 300000;
private static final String NON_CACHED = "noncached";
private static final String CACHED = "cached";
@Test
public void testCachingDisabled() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("cachingDisabled");
assertTrue(cfg.getPropertyCacheTimeout(null) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(NON_CACHED) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(CACHED) <= 0);
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
@Test
public void testSpecificCaching() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("getProperty");
cfg.setPropertyCacheTimeout(CACHED, TIMEOUT);
assertTrue(cfg.getPropertyCacheTimeout(null) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(NON_CACHED) <= 0);
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(CACHED));
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(true, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(CACHED), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
@Test
public void testAllCaching() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("setProperty");
cfg.setPropertyCacheTimeout(null, TIMEOUT);
cfg.setPropertyCacheTimeout(CACHED, TIMEOUT);
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(null));
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(NON_CACHED));
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(CACHED));
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(true, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(true, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(NON_CACHED), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(CACHED), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
private static Graph open(AccumuloGraphConfiguration cfg) {
return GraphFactory.open(cfg);
}
private static void load(Graph graph) {
graph.addVertex("A");
graph.addVertex("B").setProperty(NON_CACHED, true);
graph.addVertex("C").setProperty(CACHED, true);
}
}