Merge pull request #99 from JHUAPL/table-wrappers

Table wrappers
This commit is contained in:
Ryan Webb
2015-01-28 13:03:55 -05:00
59 changed files with 3775 additions and 1555 deletions

View File

@@ -32,6 +32,12 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.edge.EdgeEndpointsMutator;
import edu.jhuapl.tinkerpop.mutator.edge.EdgeMutator;
import edu.jhuapl.tinkerpop.mutator.property.WritePropertyMutator;
import edu.jhuapl.tinkerpop.mutator.vertex.AddVertexMutator;
/**
* This class provides high-speed ingest into an {@link AccumuloGraph} instance
@@ -91,8 +97,8 @@ public final class AccumuloBulkIngester {
AccumuloGraphUtils.handleCreateAndClear(config);
mtbw = connector.createMultiTableBatchWriter(config.getBatchWriterConfig());
vertexWriter = mtbw.getBatchWriter(config.getVertexTable());
edgeWriter = mtbw.getBatchWriter(config.getEdgeTable());
vertexWriter = mtbw.getBatchWriter(config.getVertexTableName());
edgeWriter = mtbw.getBatchWriter(config.getEdgeTableName());
}
/**
@@ -108,9 +114,7 @@ public final class AccumuloBulkIngester {
* @throws MutationsRejectedException
*/
public PropertyBuilder addVertex(String id) throws MutationsRejectedException {
Mutation m = new Mutation(id);
m.put(AccumuloGraph.LABEL, AccumuloGraph.EXISTS, AccumuloGraph.EMPTY);
vertexWriter.addMutation(m);
Mutators.apply(vertexWriter, new AddVertexMutator(id));
return new PropertyBuilder(vertexWriter, id);
}
@@ -148,8 +152,7 @@ public final class AccumuloBulkIngester {
* @throws MutationsRejectedException
*/
public PropertyBuilder addEdge(String src, String dest, String label) throws MutationsRejectedException {
String eid = UUID.randomUUID().toString();
return addEdge(eid, src, dest, label);
return addEdge(UUID.randomUUID().toString(), src, dest, label);
}
/**
@@ -168,16 +171,8 @@ public final class AccumuloBulkIngester {
* @throws MutationsRejectedException
*/
public PropertyBuilder addEdge(String id, String src, String dest, String label) throws MutationsRejectedException {
Mutation m = new Mutation(id);
m.put(AccumuloGraph.LABEL, (dest + "_" + src).getBytes(), AccumuloByteSerializer.serialize(label));
edgeWriter.addMutation(m);
m = new Mutation(dest);
m.put(AccumuloGraph.INEDGE, (src + AccumuloGraph.IDDELIM + id).getBytes(), (AccumuloGraph.IDDELIM + label).getBytes());
vertexWriter.addMutation(m);
m = new Mutation(src);
m.put(AccumuloGraph.OUTEDGE, (dest + AccumuloGraph.IDDELIM + id).getBytes(), (AccumuloGraph.IDDELIM + label).getBytes());
vertexWriter.addMutation(m);
Mutators.apply(edgeWriter, new EdgeMutator.Add(id, src, dest, label));
Mutators.apply(vertexWriter, new EdgeEndpointsMutator.Add(id, src, dest, label));
return new PropertyBuilder(edgeWriter, id);
}
@@ -210,10 +205,7 @@ public final class AccumuloBulkIngester {
* @throws MutationsRejectedException
*/
private void addProperty(BatchWriter writer, String id, String key, Object value) throws MutationsRejectedException {
byte[] newByteVal = AccumuloByteSerializer.serialize(value);
Mutation m = new Mutation(id);
m.put(key.getBytes(), AccumuloGraph.EMPTY, newByteVal);
writer.addMutation(m);
Mutators.apply(writer, new WritePropertyMutator(id, key, value));
}
/**
@@ -279,12 +271,12 @@ public final class AccumuloBulkIngester {
*/
public final class PropertyBuilder {
Mutation mutation;
BatchWriter writer;
final String id;
final BatchWriter writer;
PropertyBuilder(BatchWriter writer, String id) {
this.writer = writer;
this.mutation = new Mutation(id);
this.id = id;
}
/**
@@ -296,7 +288,13 @@ public final class AccumuloBulkIngester {
* @return
*/
public PropertyBuilder add(String key, Object value) {
mutation.put(key.getBytes(), AccumuloGraph.EMPTY, AccumuloByteSerializer.serialize(value));
for (Mutation m : new WritePropertyMutator(id, key, value).create()) {
try {
writer.addMutation(m);
} catch (MutationsRejectedException e) {
throw new AccumuloGraphException(e);
}
}
return this;
}
@@ -306,9 +304,7 @@ public final class AccumuloBulkIngester {
* @throws MutationsRejectedException
*/
public void finish() throws MutationsRejectedException {
if (mutation.size() > 0) {
writer.addMutation(mutation);
}
// No-op since Mutations are now added on the fly.
}
/**
@@ -317,7 +313,7 @@ public final class AccumuloBulkIngester {
* @return
*/
public String getId() {
return new String(mutation.getRow());
return id;
}
}
}

View File

@@ -26,21 +26,21 @@ import javax.xml.namespace.QName;
public final class AccumuloByteSerializer {
static final int NULL = 'n';
public static final int NULL = 'n';
static final int BYTE = 'b';
static final int SHORT = 's';
static final int CHARACTER = 'c';
static final int INTEGER = 'i';
static final int LONG = 'l';
static final int FLOAT = 'f';
static final int DOUBLE = 'd';
static final int BOOLEAN = 'o';
static final int DATE = 't';
static final int ENUM = 'e';
static final int STRING = 'a';
static final int SERIALIZABLE = 'x';
static final int QNAME = 'q';
public static final int BYTE = 'b';
public static final int SHORT = 's';
public static final int CHARACTER = 'c';
public static final int INTEGER = 'i';
public static final int LONG = 'l';
public static final int FLOAT = 'f';
public static final int DOUBLE = 'd';
public static final int BOOLEAN = 'o';
public static final int DATE = 't';
public static final int ENUM = 'e';
public static final int STRING = 'a';
public static final int SERIALIZABLE = 'x';
public static final int QNAME = 'q';
private AccumuloByteSerializer() {
@@ -53,6 +53,7 @@ public final class AccumuloByteSerializer {
}
};
@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] target) {
if (target[0] == NULL) {
return null;
@@ -92,6 +93,7 @@ public final class AccumuloByteSerializer {
case ENUM:
try {
String[] s = new String(target, 1, target.length - 1).split(":");
@SuppressWarnings("rawtypes")
Class<? extends Enum> clz = (Class<? extends Enum>) Class.forName(s[0]);
return (T) Enum.valueOf(clz, s[1]);
} catch (ClassNotFoundException cnfe) {

View File

@@ -14,70 +14,50 @@
*/
package edu.jhuapl.tinkerpop;
import java.util.Map;
import org.apache.log4j.Logger;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import com.tinkerpop.blueprints.util.StringFactory;
/**
* TODO
*/
public class AccumuloEdge extends AccumuloElement implements Edge {
String label;
String inId;
String outId;
Vertex inVertex;
Vertex outVertex;
private static final Logger log = Logger.getLogger(AccumuloEdge.class);
AccumuloEdge(AccumuloGraph parent, String id) {
this(parent, id, null);
private String label;
private Vertex inVertex;
private Vertex outVertex;
public AccumuloEdge(GlobalInstances globals, String id) {
this(globals, id, null, null, null);
}
AccumuloEdge(AccumuloGraph parent, String id, String label) {
this(parent, id, label, (Vertex) null, (Vertex) null);
}
AccumuloEdge(AccumuloGraph parent, String id, String label, Vertex inVertex, Vertex outVertex) {
super(parent, id, Edge.class);
public AccumuloEdge(GlobalInstances globals, String id,
Vertex inVertex, Vertex outVertex, String label) {
super(globals, id, Edge.class);
this.label = label;
this.inVertex = inVertex;
this.outVertex = outVertex;
}
AccumuloEdge(AccumuloGraph parent, String id, String label, String inVertex, String outVertex) {
super(parent, id, Edge.class);
this.label = label;
this.inId = inVertex;
this.outId = outVertex;
}
@Override
public Vertex getVertex(Direction direction) throws IllegalArgumentException {
switch (direction) {
case IN:
if (inVertex == null) {
if (inId == null) {
inVertex = parent.getEdgeVertex(id, direction);
inId = inVertex.getId().toString();
} else {
inVertex = parent.getVertex(inId);
}
}
return inVertex;
case OUT:
if (outVertex == null) {
if (outId == null) {
outVertex = parent.getEdgeVertex(id, direction);
outId = outVertex.getId().toString();
} else {
outVertex = parent.getVertex(outId);
}
}
return outVertex;
case BOTH:
throw ExceptionFactory.bothIsNotSupported();
default:
throw new RuntimeException("Unexpected direction: " + direction);
if (!Direction.IN.equals(direction) && !Direction.OUT.equals(direction)) {
throw new IllegalArgumentException("Invalid direction: "+direction);
}
// The vertex information needs to be loaded.
if (inVertex == null || outVertex == null || label == null) {
log.debug("Loading information for edge: "+this);
globals.getEdgeWrapper().loadEndpointsAndLabel(this);
}
return Direction.IN.equals(direction) ? inVertex : outVertex;
}
@Override
@@ -91,31 +71,45 @@ public class AccumuloEdge extends AccumuloElement implements Edge {
@Override
public void remove() {
parent.removeEdge(this);
// Remove from named indexes.
super.removeElementFromNamedIndexes();
// If edge was removed already, forget it.
// This may happen due to self-loops...
if (!globals.getEdgeWrapper().elementExists(id)) {
return;
}
// Remove properties from key/value indexes.
Map<String, Object> props = globals.getEdgeWrapper()
.readAllProperties(this);
for (String key : props.keySet()) {
globals.getEdgeKeyIndexWrapper().removePropertyFromIndex(this,
key, props.get(key));
}
// Get rid of the endpoints and edge themselves.
globals.getVertexWrapper().deleteEdgeEndpoints(this);
globals.getEdgeWrapper().deleteEdge(this);
// Remove element from cache.
globals.getCaches().remove(id, Edge.class);
globals.checkedFlush();
}
public String getInId() {
return inId;
public void setVertices(AccumuloVertex inVertex, AccumuloVertex outVertex) {
this.inVertex = inVertex;
this.outVertex = outVertex;
}
public String getOutId() {
return outId;
}
protected void setInId(String id){
inId = id;
}
protected void setOutId(String id){
outId = id;
}
protected void setLabel(String label){
public void setLabel(String label) {
this.label = label;
}
@Override
public String toString() {
return "[" + getId() + ":" + getVertex(Direction.OUT) + " -> " + getLabel() + " -> " + getVertex(Direction.IN) + "]";
return "[" + getId() + ":" + inVertex + " -> " + label + " -> " + outVertex + "]";
}
}

View File

@@ -16,71 +16,153 @@ package edu.jhuapl.tinkerpop;
import java.util.Set;
import org.apache.accumulo.core.util.Pair;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Index;
import com.tinkerpop.blueprints.util.StringFactory;
import edu.jhuapl.tinkerpop.cache.PropertyCache;
/**
* TODO
*/
public abstract class AccumuloElement implements Element {
protected AccumuloGraph parent;
protected GlobalInstances globals;
protected String id;
private Class<? extends Element> type;
private PropertyCache propertyCache;
protected AccumuloElement(AccumuloGraph parent, String id, Class<? extends Element> type) {
this.parent = parent;
protected AccumuloElement(GlobalInstances globals,
String id, Class<? extends Element> type) {
this.globals = globals;
this.id = id;
this.type = type;
}
@Override
public <T> T getProperty(String key) {
/**
* Create properties cache if it doesn't exist,
* and preload any properties.
*/
private void makeCache() {
if (propertyCache == null) {
// Lazily create the properties cache.
// We will create it here just in case the parent does not actually
// pre-load any data. Note it also may be created in the
// cacheProperty method, as well, in the event a class pre-loads
// data before a call is made to obtain it.
propertyCache = new PropertyCache(parent.config);
propertyCache = new PropertyCache(globals.getConfig());
parent.preloadProperties(this, type);
}
T val = propertyCache.get(key);
if (val != null) {
return val;
} else {
Pair<Integer, T> pair = parent.getProperty(type, id, key);
if (pair.getFirst() != null) {
cacheProperty(key, pair.getSecond());
// Preload any keys, if needed.
String[] preloadKeys = globals.getConfig().getPreloadedProperties();
if (preloadKeys != null) {
propertyCache.putAll(globals.getElementWrapper(type)
.readProperties(this, preloadKeys));
}
return pair.getSecond();
}
}
@Override
public <T> T getProperty(String key) {
makeCache();
// Get from property cache.
T value = propertyCache.get(key);
// If not cached, get it from the backing table.
if (value == null) {
value = globals.getElementWrapper(type).readProperty(this, key);
}
// Cache the new value.
if (value != null) {
propertyCache.put(key, value);
}
return value;
}
@Override
public Set<String> getPropertyKeys() {
return parent.getPropertyKeys(type, id);
return globals.getElementWrapper(type).readPropertyKeys(this);
}
@Override
public void setProperty(String key, Object value) {
parent.setProperty(type, id, key, value);
cacheProperty(key, value);
makeCache();
globals.getKeyIndexTableWrapper(type).setPropertyForIndex(this, key, value);
// MDL 31 Dec 2014: The above calls getProperty, so this
// order is important (for now).
globals.getElementWrapper(type).writeProperty(this, key, value);
globals.checkedFlush();
setPropertyInMemory(key, value);
}
/**
* Set a property but only in the instantiated object,
* not in the backing store.
* @param key
* @param value
*/
public void setPropertyInMemory(String key, Object value) {
makeCache();
propertyCache.put(key, value);
}
@Override
public <T> T removeProperty(String key) {
if (propertyCache != null) {
// we have the cached value but we still need to pass this on to the
// parent so it can actually remove the data from the backing store.
// Since we have to do that anyway, we will use the parent's value
// instead of the cache value to to be as up-to-date as possible.
// Of course we still need to clear out the cached value...
propertyCache.remove(key);
if (StringFactory.LABEL.equals(key) ||
Constants.LABEL.equals(key)) {
throw new AccumuloGraphException("Cannot remove the " + StringFactory.LABEL + " property.");
}
return parent.removeProperty(type, id, key);
makeCache();
T value = getProperty(key);
if (value != null) {
globals.getElementWrapper(type).clearProperty(this, key);
globals.checkedFlush();
}
globals.getKeyIndexTableWrapper(type).removePropertyFromIndex(this, key, value);
// MDL 31 Dec 2014: AccumuloGraph.removeProperty
// calls getProperty which populates the cache.
// So the order here is important (for now).
removePropertyInMemory(key);
return value;
}
/**
* Remove element from all named indexes.
* @param element
*/
protected void removeElementFromNamedIndexes() {
for (Index<? extends Element> index : globals.getNamedIndexListWrapper().getIndices()) {
((AccumuloIndex<? extends Element>) index).getWrapper().removeElementFromIndex(this);
}
}
/**
* Remove a property but only in the instantiated
* object, not the backing store.
* @param key
*/
public void removePropertyInMemory(String key) {
makeCache();
propertyCache.remove(key);
}
/**
* Return the properties currently cached in memory.
* @return
*/
public Iterable<String> getPropertyKeysInMemory() {
makeCache();
return propertyCache.keySet();
}
/**
* Retrieve a property from memory.
* @param key
* @return
*/
public Object getPropertyInMemory(String key) {
makeCache();
return propertyCache.get(key);
}
@Override
@@ -97,19 +179,20 @@ public abstract class AccumuloElement implements Element {
} else if (!obj.getClass().equals(getClass())) {
return false;
} else {
return this.id.equals(((AccumuloElement) obj).id);
return id.equals(((AccumuloElement) obj).id);
}
}
@Override
public int hashCode() {
return getClass().hashCode() ^ id.hashCode();
}
void cacheProperty(String key, Object value) {
if (propertyCache == null) {
propertyCache = new PropertyCache(parent.config);
}
propertyCache.put(key, value);
/**
* Internal method for unit tests.
* @return
*/
PropertyCache getPropertyCache() {
return propertyCache;
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -43,6 +43,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.IndexableGraph;
import com.tinkerpop.blueprints.KeyIndexableGraph;
@@ -69,6 +70,11 @@ implements Serializable {
private MiniAccumuloCluster accumuloMiniCluster;
/**
* The {@link AccumuloGraph} class.
*/
public static final Class<? extends Graph> ACCUMULO_GRAPH_CLASS = AccumuloGraph.class;
/**
* The fully-qualified class name of the class that implements
* the TinkerPop Graph interface.
@@ -76,7 +82,7 @@ implements Serializable {
* which type to instantiate.
*/
public static final String ACCUMULO_GRAPH_CLASSNAME =
AccumuloGraph.class.getCanonicalName();
ACCUMULO_GRAPH_CLASS.getCanonicalName();
/**
* An enumeration used by {@link AccumuloGraphConfiguration#setInstanceType(InstanceType)}
@@ -922,42 +928,71 @@ implements Serializable {
return this;
}
public String getVertexTable() {
/**
* Name of vertex table (keyed by vertex id).
* @return
*/
public String getVertexTableName() {
return getGraphName() + "_vertex";
}
public String getEdgeTable() {
/**
* Name of edge table (keyed by edge id).
* @return
*/
public String getEdgeTableName() {
return getGraphName() + "_edge";
}
public String getKeyMetadataTable() {
return getMetadataTable() + "KEY";
/**
* Name of vertex key index table (keyed on
* vertex property keys).
* @return
*/
public String getVertexKeyIndexTableName() {
return getGraphName() + "_vertex_key_index";
}
String getVertexIndexTable() {
return getGraphName() + "_vertex_index";
/**
* Name of edge key index table (keyed on
* edge property keys).
* @return
*/
public String getEdgeKeyIndexTableName() {
return getGraphName() + "_edge_key_index";
}
String getEdgeIndexTable() {
return getGraphName() + "_edge_index";
/**
* Table of the index with given name (keyed
* on property keys of the given element type).
* @param indexName
* @return
*/
public String getNamedIndexTableName(String indexName) {
return getGraphName() + "_index_" + indexName;
}
String getMetadataTable() {
return getGraphName() + "_meta";
/**
* Table listing the key-indexed properties
* of elements.
* @return
*/
public String getIndexedKeysTableName() {
return getGraphName() + "_indexed_keys";
}
String getKeyVertexIndexTable() {
return getGraphName() + "_vertex_index_key";
}
String getKeyEdgeIndexTable() {
return getGraphName() + "_edge_index_key";
/**
* Table of existing named indexes.
* @return
*/
public String getIndexNamesTableName() {
return getGraphName() + "_index_names";
}
List<String> getTableNames() {
return Arrays.asList(getVertexTable(),
getEdgeTable(), getVertexIndexTable(), getEdgeIndexTable(),
getMetadataTable(), getKeyMetadataTable());
return Arrays.asList(getVertexTableName(),
getEdgeTableName(), getVertexKeyIndexTableName(), getEdgeKeyIndexTableName(),
getIndexNamesTableName(), getIndexedKeysTableName());
}
/**

View File

@@ -15,11 +15,15 @@
package edu.jhuapl.tinkerpop;
import java.util.SortedSet;
import java.util.UUID;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.hadoop.io.Text;
final class AccumuloGraphUtils {
import com.tinkerpop.blueprints.util.ExceptionFactory;
import com.tinkerpop.blueprints.util.StringFactory;
public final class AccumuloGraphUtils {
/**
* Create and/or clear existing graph tables for the given configuration.
@@ -76,4 +80,45 @@ final class AccumuloGraphUtils {
throw new IllegalArgumentException(e);
}
}
/**
* Generate an element id.
* @return
*/
public static String generateId() {
return UUID.randomUUID().toString();
}
/**
* Ensure that the given key/value don't conflict with
* Blueprints reserved words.
* @param key
* @param value
*/
public static void validateProperty(String key, Object value) {
nullCheckProperty(key, value);
if (key.equals(StringFactory.ID)) {
throw ExceptionFactory.propertyKeyIdIsReserved();
} else if (key.equals(StringFactory.LABEL)) {
throw ExceptionFactory.propertyKeyLabelIsReservedForEdges();
} else if (value == null) {
throw ExceptionFactory.propertyValueCanNotBeNull();
}
}
/**
* Disallow null keys/values and throw appropriate
* Blueprints exceptions.
* @param key
* @param value
*/
public static void nullCheckProperty(String key, Object value) {
if (key == null) {
throw ExceptionFactory.propertyKeyCanNotBeNull();
} else if (value == null) {
throw ExceptionFactory.propertyValueCanNotBeNull();
} else if (key.trim().equals(StringFactory.EMPTY_STRING)) {
throw ExceptionFactory.propertyKeyCanNotBeEmpty();
}
}
}

View File

@@ -15,82 +15,76 @@
package edu.jhuapl.tinkerpop;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.CloseableIterable;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Index;
import com.tinkerpop.blueprints.IndexableGraph;
import edu.jhuapl.tinkerpop.tables.namedindex.NamedIndexTableWrapper;
/**
* Accumulo-based implementation for {@link IndexableGraph}.
* @param <T>
*/
public class AccumuloIndex<T extends Element> implements Index<T> {
Class<T> indexedType;
AccumuloGraph parent;
String indexName;
String tableName;
private final GlobalInstances globals;
private final Class<T> indexedType;
private final String indexName;
private final NamedIndexTableWrapper indexWrapper;
public AccumuloIndex(Class<T> t, AccumuloGraph parent, String indexName) {
this.indexedType = t;
this.parent = parent;
public AccumuloIndex(GlobalInstances globals, String indexName, Class<T> indexedType) {
this.globals = globals;
this.indexName = indexName;
tableName = parent.config.getGraphName() + "_index_" + indexName;// + "_" +
// t;
this.indexedType = indexedType;
try {
if (!parent.config.getConnector().tableOperations().exists(tableName)) {
parent.config.getConnector().tableOperations().create(tableName);
if (!globals.getConfig().getConnector()
.tableOperations().exists(getTableName())) {
globals.getConfig().getConnector()
.tableOperations().create(getTableName());
}
} catch (Exception e) {
throw new RuntimeException(e);
throw new AccumuloGraphException(e);
}
indexWrapper = new NamedIndexTableWrapper(globals, indexedType, indexName);
}
@Override
public String getIndexName() {
return indexName;
}
public String getTableName() {
return globals.getConfig().getNamedIndexTableName(indexName);
}
public NamedIndexTableWrapper getWrapper() {
return indexWrapper;
}
@Override
public Class<T> getIndexClass() {
return indexedType;
}
@Override
public void put(String key, Object value, Element element) {
element.setProperty(key, value);
Mutation m = new Mutation(AccumuloByteSerializer.serialize(value));
m.put(key.getBytes(), element.getId().toString().getBytes(), "".getBytes());
BatchWriter w = getWriter();
try {
w.addMutation(m);
w.flush();
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
indexWrapper.setPropertyForIndex(element, key, value, true);
}
@Override
public CloseableIterable<T> get(String key, Object value) {
Scanner scan = getScanner();
byte[] id = AccumuloByteSerializer.serialize(value);
scan.setRange(new Range(new Text(id), new Text(id)));
scan.fetchColumnFamily(new Text(key));
return new IndexIterable(parent, scan, indexedType);
return indexWrapper.readElementsFromIndex(key, value);
}
@Override
public CloseableIterable<T> query(String key, Object query) {
throw new UnsupportedOperationException();
}
@Override
public long count(String key, Object value) {
CloseableIterable<T> iterable = get(key, value);
Iterator<T> iter = iterable.iterator();
@@ -103,75 +97,8 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
return count;
}
public void remove(String key, Object value, Element element) {
Mutation m = new Mutation(AccumuloByteSerializer.serialize(value));
m.putDelete(key.getBytes(), element.getId().toString().getBytes());
BatchWriter w = getWriter();
try {
w.addMutation(m);
w.flush();
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
}
private BatchWriter getWriter() {
return parent.getWriter(tableName);
}
private Scanner getScanner() {
return parent.getScanner(tableName);
}
public class IndexIterable implements CloseableIterable<T> {
AccumuloGraph parent;
ScannerBase scan;
boolean isClosed;
Class<T> indexedType;
IndexIterable(AccumuloGraph parent, ScannerBase scan, Class<T> t) {
this.scan = scan;
this.parent = parent;
isClosed = false;
indexedType = t;
}
public Iterator<T> iterator() {
if (!isClosed) {
return new ScannerIterable<T>(parent, scan) {
@Override
public T next(PeekingIterator<Entry<Key, Value>> iterator) {
String id = iterator.next()
.getKey().getColumnQualifier().toString();
// TODO better use of information readily
// available...
if (indexedType.equals(Edge.class)) {
return (T) new AccumuloEdge(parent, id);
}
else {
return (T) new AccumuloVertex(parent, id);
}
}
}.iterator();
}
else {
return null;
}
}
public void close() {
if (!isClosed) {
scan.close();
isClosed = true;
}
}
}
@Override
public Class<T> getIndexClass() {
return indexedType;
public void remove(String key, Object value, Element element) {
indexWrapper.removePropertyFromIndex(element, key, value);
}
}

View File

@@ -14,26 +14,32 @@
*/
package edu.jhuapl.tinkerpop;
import java.util.Map;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
import com.tinkerpop.blueprints.util.DefaultVertexQuery;
import com.tinkerpop.blueprints.util.ExceptionFactory;
/**
* TODO
*/
public class AccumuloVertex extends AccumuloElement implements Vertex {
AccumuloVertex(AccumuloGraph parent, String id) {
super(parent, id, Vertex.class);
public AccumuloVertex(GlobalInstances globals, String id) {
super(globals, id, Vertex.class);
}
@Override
public Iterable<Edge> getEdges(Direction direction, String... labels) {
return parent.getEdges(id, direction, labels);
return globals.getVertexWrapper().getEdges(this, direction, labels);
}
@Override
public Iterable<Vertex> getVertices(Direction direction, String... labels) {
return parent.getVertices(id, direction, labels);
return globals.getVertexWrapper().getVertices(this, direction, labels);
}
@Override
@@ -43,12 +49,72 @@ public class AccumuloVertex extends AccumuloElement implements Vertex {
@Override
public Edge addEdge(String label, Vertex inVertex) {
return parent.addEdge(null, this, inVertex, label);
return addEdge(null, label, inVertex);
}
/**
* Add an edge as with {@link #addEdge(String, Vertex)},
* but with a specified edge id.
* @param id
* @param label
* @param inVertex
* @return
*/
public Edge addEdge(Object id, String label, Vertex inVertex) {
if (label == null) {
throw ExceptionFactory.edgeLabelCanNotBeNull();
}
if (id == null) {
id = AccumuloGraphUtils.generateId();
}
String myID = id.toString();
AccumuloEdge edge = new AccumuloEdge(globals, myID, inVertex, this, label);
// TODO we arent suppose to make sure the given edge ID doesn't already
// exist?
globals.getEdgeWrapper().writeEdge(edge);
globals.getVertexWrapper().writeEdgeEndpoints(edge);
globals.checkedFlush();
globals.getCaches().cache(edge, Edge.class);
return edge;
}
@Override
public void remove() {
parent.removeVertex(this);
globals.getCaches().remove(getId(), Vertex.class);
super.removeElementFromNamedIndexes();
// Throw exception if the element does not exist.
if (!globals.getVertexWrapper().elementExists(id)) {
throw ExceptionFactory.vertexWithIdDoesNotExist(getId());
}
// Remove properties from key/value indexes.
Map<String, Object> props = globals.getVertexWrapper()
.readAllProperties(this);
for (String key : props.keySet()) {
globals.getVertexKeyIndexWrapper().removePropertyFromIndex(this,
key, props.get(key));
}
// Remove edges incident to this vertex.
for (Edge edge : getEdges(Direction.BOTH)) {
edge.remove();
}
globals.checkedFlush();
// Get rid of the vertex.
globals.getVertexWrapper().deleteVertex(this);
globals.checkedFlush();
}
@Override

View File

@@ -0,0 +1,37 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
/**
* Collect up various constants here.
* @author Michael Lieberman
*
*/
public class Constants {
private Constants() { }
public static final String ID_DELIM = "__DELIM__";
public static final byte[] EMPTY = new byte[0];
/**
* Prefixes for various Accumulo entries.
*/
public static final String LABEL = "__LABEL__";
public static final String IN_EDGE = "__INEDGE__";
public static final String OUT_EDGE = "__OUTEDGE__";
public static final String EXISTS = "__EXISTS__";
}

View File

@@ -0,0 +1,119 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.cache.ElementCaches;
import edu.jhuapl.tinkerpop.tables.core.EdgeTableWrapper;
import edu.jhuapl.tinkerpop.tables.core.ElementTableWrapper;
import edu.jhuapl.tinkerpop.tables.core.VertexTableWrapper;
import edu.jhuapl.tinkerpop.tables.keyindex.BaseKeyIndexTableWrapper;
import edu.jhuapl.tinkerpop.tables.keyindex.EdgeKeyIndexTableWrapper;
import edu.jhuapl.tinkerpop.tables.keyindex.IndexedKeysListTableWrapper;
import edu.jhuapl.tinkerpop.tables.keyindex.VertexKeyIndexTableWrapper;
import edu.jhuapl.tinkerpop.tables.namedindex.NamedIndexListTableWrapper;
/**
* Internal class gathering together instances of
* objects needed for various AccumuloGraph components.
*/
public class GlobalInstances {
private final AccumuloGraphConfiguration config;
private final MultiTableBatchWriter mtbw;
private final ElementCaches caches;
public GlobalInstances(AccumuloGraphConfiguration config,
MultiTableBatchWriter mtbw, ElementCaches caches) {
this.config = config;
this.mtbw = mtbw;
this.caches = caches;
}
public AccumuloGraphConfiguration getConfig() {
return config;
}
public MultiTableBatchWriter getMtbw() {
return mtbw;
}
public VertexTableWrapper getVertexWrapper() {
return new VertexTableWrapper(this);
}
public EdgeTableWrapper getEdgeWrapper() {
return new EdgeTableWrapper(this);
}
public VertexKeyIndexTableWrapper getVertexKeyIndexWrapper() {
return new VertexKeyIndexTableWrapper(this);
}
public EdgeKeyIndexTableWrapper getEdgeKeyIndexWrapper() {
return new EdgeKeyIndexTableWrapper(this);
}
public IndexedKeysListTableWrapper getIndexedKeysListWrapper() {
return new IndexedKeysListTableWrapper(this);
}
public NamedIndexListTableWrapper getNamedIndexListWrapper() {
return new NamedIndexListTableWrapper(this);
}
public <T extends Element> ElementTableWrapper getElementWrapper(Class<T> clazz) {
if (Vertex.class.equals(clazz)) {
return getVertexWrapper();
} else if (Edge.class.equals(clazz)) {
return getEdgeWrapper();
} else {
throw new AccumuloGraphException("Unrecognized class: "+clazz);
}
}
public <T extends Element> BaseKeyIndexTableWrapper getKeyIndexTableWrapper(Class<T> clazz) {
if (Vertex.class.equals(clazz)) {
return getVertexKeyIndexWrapper();
} else if (Edge.class.equals(clazz)) {
return getEdgeKeyIndexWrapper();
} else {
throw new AccumuloGraphException("Unrecognized class: "+clazz);
}
}
public ElementCaches getCaches() {
return caches;
}
/**
* Flush the writer, if autoflush is enabled.
*/
public void checkedFlush() {
if (config.getAutoFlush()) {
try {
mtbw.flush();
} catch (MutationsRejectedException e) {
throw new AccumuloGraphException(e);
}
}
}
}

View File

@@ -14,7 +14,6 @@
*/
package edu.jhuapl.tinkerpop;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -23,15 +22,17 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import com.tinkerpop.blueprints.CloseableIterable;
import com.tinkerpop.blueprints.Element;
public abstract class ScannerIterable<T extends Element> implements Iterable<T>, Closeable {
/**
* TODO
*/
public abstract class ScannerIterable<T extends Element> implements CloseableIterable<T> {
AccumuloGraph parent;
ScannerBase scanner;
private ScannerBase scanner;
ScannerIterable(AccumuloGraph parent, ScannerBase scanner) {
this.parent = parent;
public ScannerIterable(ScannerBase scanner) {
this.scanner = scanner;
}
@@ -55,10 +56,10 @@ public abstract class ScannerIterable<T extends Element> implements Iterable<T>,
close();
}
class ScannerIterator implements Iterator<T> {
PeekingIterator<Entry<Key,Value>> iterator;
private class ScannerIterator implements Iterator<T> {
private PeekingIterator<Entry<Key,Value>> iterator;
ScannerIterator(PeekingIterator<Entry<Key,Value>> iterator) {
private ScannerIterator(PeekingIterator<Entry<Key,Value>> iterator) {
this.iterator = iterator;
}
@@ -76,7 +77,5 @@ public abstract class ScannerIterable<T extends Element> implements Iterable<T>,
public void remove() {
throw new UnsupportedOperationException();
}
}
}

View File

@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
package edu.jhuapl.tinkerpop.cache;
import java.util.concurrent.TimeUnit;

View File

@@ -0,0 +1,77 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.cache;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
/**
* Utility class wrapping element caches.
*/
public class ElementCaches {
private ElementCache<Vertex> vertexCache;
private ElementCache<Edge> edgeCache;
public ElementCaches(AccumuloGraphConfiguration config) {
if (config.getVertexCacheEnabled()) {
vertexCache = new ElementCache<Vertex>(config.getVertexCacheSize(),
config.getVertexCacheTimeout());
}
if (config.getEdgeCacheEnabled()) {
edgeCache = new ElementCache<Edge>(config.getEdgeCacheSize(),
config.getEdgeCacheTimeout());
}
}
public <T extends Element> void cache(T element, Class<T> clazz) {
if (pick(clazz) != null) {
pick(clazz).cache(element);
}
}
public <T extends Element> T retrieve(Object id, Class<T> clazz) {
return pick(clazz) != null ? pick(clazz).retrieve(id) : null;
}
public <T extends Element> void remove(Object id, Class<T> clazz) {
if (pick(clazz) != null) {
pick(clazz).remove(id);
}
}
public <T extends Element> void clear(Class<T> clazz) {
if (pick(clazz) != null) {
pick(clazz).clear();
}
}
@SuppressWarnings("unchecked")
private <T extends Element> ElementCache<T> pick(Class<T> clazz) {
if (Vertex.class.equals(clazz)) {
return (ElementCache<T>) vertexCache;
}
else if (Edge.class.equals(clazz)) {
return (ElementCache<T>) edgeCache;
}
else {
throw new AccumuloGraphException("Unknown element class: "+clazz);
}
}
}

View File

@@ -12,10 +12,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
package edu.jhuapl.tinkerpop.cache;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
/**
* Cache for storing element properties.
@@ -34,6 +37,14 @@ public class PropertyCache {
this.values = new HashMap<String, TimedValue>();
}
public boolean containsKey(String key) {
return values.containsKey(key);
}
public Set<String> keySet() {
return values.keySet();
}
public void put(String key, Object value) {
Integer timeout = getTimeout(key);
@@ -45,6 +56,13 @@ public class PropertyCache {
timeout != null ? System.currentTimeMillis() + timeout : null));
}
public void putAll(Map<String, Object> entries) {
for (String key : entries.keySet()) {
put(key, entries.get(key));
}
}
@SuppressWarnings("unchecked")
public <T> T get(String key) {
long now = System.currentTimeMillis();
@@ -71,6 +89,11 @@ public class PropertyCache {
values.clear();
}
@Override
public String toString() {
return values.toString();
}
/**
* Return the timeout for the given key.
* Checks for a key-specific timeout
@@ -106,5 +129,10 @@ public class PropertyCache {
public Long getExpiry() {
return expiry;
}
@Override
public String toString() {
return "[" + value + ", " + expiry + "]";
}
}
}

View File

@@ -24,6 +24,7 @@ import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloGraph;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
import edu.jhuapl.tinkerpop.Constants;
public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
@@ -84,10 +85,10 @@ public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
String eid = currentKey.getRow().toString();
String colf = currentKey.getColumnFamily().toString();
switch (colf) {
case AccumuloGraph.SLABEL:
case Constants.LABEL:
currentK.set(eid);
edge.prepareId(eid);
String[] ids = currentKey.getColumnQualifier().toString().split(parent.IDDELIM);
String[] ids = currentKey.getColumnQualifier().toString().split(Constants.ID_DELIM);
edge.setSourceId(ids[1]);
edge.setDestId(ids[0]);
edge.setLabel(AccumuloByteSerializer.deserialize(entry.getValue().get()).toString());
@@ -109,7 +110,7 @@ public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
public static void setAccumuloGraphConfiguration(Job job, AccumuloGraphConfiguration cfg) throws AccumuloSecurityException {
EdgeInputFormat.setConnectorInfo(job, cfg.getUser(), new PasswordToken(cfg.getPassword()));
EdgeInputFormat.setInputTableName(job, cfg.getEdgeTable());
EdgeInputFormat.setInputTableName(job, cfg.getEdgeTableName());
if (cfg.getInstanceType().equals(InstanceType.Mock)) {
EdgeInputFormat.setMockInstance(job, cfg.getInstanceName());
} else {

View File

@@ -92,9 +92,9 @@ public class ElementOutputFormat extends OutputFormat<NullWritable,Element> {
try {
if (bw == null) {
if (ele instanceof MapReduceVertex) {
bw = config.getConnector().createBatchWriter(config.getVertexTable(), config.getBatchWriterConfig());
bw = config.getConnector().createBatchWriter(config.getVertexTableName(), config.getBatchWriterConfig());
} else {
bw = config.getConnector().createBatchWriter(config.getEdgeTable(), config.getBatchWriterConfig());
bw = config.getConnector().createBatchWriter(config.getEdgeTableName(), config.getBatchWriterConfig());
}
}

View File

@@ -64,6 +64,7 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
return id;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getProperty(String key) {

View File

@@ -24,6 +24,7 @@ import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloGraph;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
import edu.jhuapl.tinkerpop.Constants;
public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
static AccumuloGraphConfiguration conf;
@@ -85,17 +86,17 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
String vid = currentKey.getRow().toString();
String colf = currentKey.getColumnFamily().toString();
switch (colf) {
case AccumuloGraph.SLABEL:
case Constants.LABEL:
currentK.set(vid);
vertex.prepareId(vid);
break;
case AccumuloGraph.SINEDGE:
String[] parts = currentKey.getColumnQualifier().toString().split(AccumuloGraph.IDDELIM);
case Constants.IN_EDGE:
String[] parts = currentKey.getColumnQualifier().toString().split(Constants.ID_DELIM);
String label = new String(entry.getValue().get());
vertex.prepareEdge(parts[1], parts[0], label, vid);
break;
case AccumuloGraph.SOUTEDGE:
parts = currentKey.getColumnQualifier().toString().split(AccumuloGraph.IDDELIM);
case Constants.OUT_EDGE:
parts = currentKey.getColumnQualifier().toString().split(Constants.ID_DELIM);
label = new String(entry.getValue().get());
vertex.prepareEdge(parts[1], vid, label, parts[0]);
break;
@@ -116,7 +117,7 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
public static void setAccumuloGraphConfiguration(Job job, AccumuloGraphConfiguration cfg) throws AccumuloSecurityException {
VertexInputFormat.setConnectorInfo(job, cfg.getUser(), new PasswordToken(cfg.getPassword()));
VertexInputFormat.setInputTableName(job, cfg.getVertexTable());
VertexInputFormat.setInputTableName(job, cfg.getVertexTableName());
if (cfg.getInstanceType().equals(InstanceType.Mock)) {
VertexInputFormat.setMockInstance(job, cfg.getInstanceName());
} else {

View File

@@ -0,0 +1,22 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator;
import org.apache.accumulo.core.data.Mutation;
public interface Mutator {
public Iterable<Mutation> create();
}

View File

@@ -0,0 +1,53 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator;
import java.util.LinkedList;
import java.util.List;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Range;
import com.tinkerpop.blueprints.Element;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
public class Mutators {
public static void apply(BatchWriter writer, Mutator mut) {
try {
writer.addMutations(mut.create());
} catch (MutationsRejectedException e) {
throw new AccumuloGraphException(e);
}
}
public static void deleteElementRanges(BatchDeleter deleter, Element... elements) {
List<Range> ranges = new LinkedList<Range>();
for (Element element : elements) {
ranges.add(new Range(element.getId().toString()));
}
deleter.setRanges(ranges);
try {
deleter.delete();
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
}

View File

@@ -0,0 +1,41 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.edge;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.mutator.Mutator;
public abstract class BaseEdgeMutator implements Mutator {
protected final String id;
protected final String outVertexId;
protected final String inVertexId;
protected final String label;
public BaseEdgeMutator(Edge edge) {
this(edge.getId().toString(),
edge.getVertex(Direction.OUT).getId().toString(),
edge.getVertex(Direction.IN).getId().toString(),
edge.getLabel());
}
public BaseEdgeMutator(String id, String outVertexId, String inVertexId, String label) {
this.id = id;
this.outVertexId = outVertexId;
this.inVertexId = inVertexId;
this.label = label;
}
}

View File

@@ -0,0 +1,78 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.edge;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.Constants;
public class EdgeEndpointsMutator {
private EdgeEndpointsMutator() {
}
public static class Add extends BaseEdgeMutator {
public Add(Edge edge) {
super(edge);
}
public Add(String id, String outVertexId, String inVertexId, String label) {
super(id, outVertexId, inVertexId, label);
}
@Override
public Iterable<Mutation> create() {
Mutation in = new Mutation(inVertexId);
in.put(Constants.IN_EDGE.getBytes(),
(outVertexId + Constants.ID_DELIM + id).getBytes(),
(Constants.ID_DELIM + label).getBytes());
Mutation out = new Mutation(outVertexId);
out.put(Constants.OUT_EDGE.getBytes(),
(inVertexId + Constants.ID_DELIM + id).getBytes(),
(Constants.ID_DELIM + label).getBytes());
return Lists.newArrayList(in, out);
}
}
public static class Delete extends BaseEdgeMutator {
public Delete(Edge edge) {
super(edge);
}
public Delete(String id, String outVertexId, String inVertexId, String label) {
super(id, outVertexId, inVertexId, label);
}
@Override
public Iterable<Mutation> create() {
Mutation in = new Mutation(inVertexId);
in.putDelete(Constants.IN_EDGE.getBytes(),
(outVertexId + Constants.ID_DELIM + id).getBytes());
Mutation out = new Mutation(outVertexId);
out.putDelete(Constants.OUT_EDGE.getBytes(),
(inVertexId + Constants.ID_DELIM + id).getBytes());
return Lists.newArrayList(in, out);
}
}
}

View File

@@ -0,0 +1,67 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.edge;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.Constants;
public final class EdgeMutator {
public static class Add extends BaseEdgeMutator {
public Add(Edge edge) {
super(edge);
}
public Add(String id, String outVertexId, String inVertexId, String label) {
super(id, outVertexId, inVertexId, label);
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(id);
m.put(Constants.LABEL.getBytes(),
(inVertexId + Constants.ID_DELIM + outVertexId).getBytes(),
AccumuloByteSerializer.serialize(label));
return Lists.newArrayList(m);
}
}
public static class Delete extends BaseEdgeMutator {
public Delete(Edge edge) {
super(edge);
}
public Delete(String id, String outVertexId, String inVertexId, String label) {
super(id, outVertexId, inVertexId, label);
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(id);
m.putDelete(Constants.LABEL.getBytes(),
(inVertexId + Constants.ID_DELIM + outVertexId).getBytes());
return Lists.newArrayList(m);
}
}
}

View File

@@ -0,0 +1,68 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.index;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import com.tinkerpop.blueprints.Element;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.mutator.Mutator;
/**
* Mutators for index metadata table entries.
*/
public class IndexMetadataMutator {
private IndexMetadataMutator() { }
public static class Add implements Mutator {
private final String key;
private final Class<? extends Element> clazz;
public Add(String key, Class<? extends Element> clazz) {
this.key = key;
this.clazz = clazz;
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(key);
m.put(clazz.getName().getBytes(),
Constants.EMPTY, Constants.EMPTY);
return Lists.newArrayList(m);
}
}
public static class Delete implements Mutator {
private final String key;
private final Class<? extends Element> clazz;
public Delete(String indexName, Class<? extends Element> clazz) {
this.key = indexName;
this.clazz = clazz;
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(key);
m.putDelete(clazz.getName().getBytes(), Constants.EMPTY);
return Lists.newArrayList(m);
}
}
}

View File

@@ -0,0 +1,75 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.index;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import com.tinkerpop.blueprints.Element;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.mutator.Mutator;
/**
* Mutators for vertex/edge index tables.
*/
public class IndexValueMutator {
private IndexValueMutator() { }
public static class Add implements Mutator {
private final Element element;
private final String key;
private final Object value;
public Add(Element element, String key, Object value) {
this.element = element;
this.key = key;
this.value = value;
}
@Override
public Iterable<Mutation> create() {
byte[] bytes = AccumuloByteSerializer.serialize(value);
Mutation m = new Mutation(bytes);
m.put(key.getBytes(), element.getId().toString()
.getBytes(), Constants.EMPTY);
return Lists.newArrayList(m);
}
}
public static class Delete implements Mutator {
private final Element element;
private final String key;
private final Object value;
public Delete(Element element, String key, Object value) {
this.element = element;
this.key = key;
this.value = value;
}
@Override
public Iterable<Mutation> create() {
byte[] bytes = AccumuloByteSerializer.serialize(value);
Mutation m = new Mutation(bytes);
m.putDelete(key, element.getId().toString());
return Lists.newArrayList(m);
}
}
}

View File

@@ -0,0 +1,28 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.property;
import edu.jhuapl.tinkerpop.mutator.Mutator;
public abstract class BasePropertyMutator implements Mutator {
protected final String id;
protected final String key;
public BasePropertyMutator(String id, String key) {
this.id = id;
this.key = key;
}
}

View File

@@ -0,0 +1,34 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.property;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import edu.jhuapl.tinkerpop.Constants;
public class ClearPropertyMutator extends BasePropertyMutator {
public ClearPropertyMutator(String id, String key) {
super(id, key);
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(id);
m.putDelete(key.getBytes(), Constants.EMPTY);
return Lists.newArrayList(m);
}
}

View File

@@ -0,0 +1,39 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.property;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.Constants;
public class WritePropertyMutator extends BasePropertyMutator {
private final Object value;
public WritePropertyMutator(String id, String key, Object value) {
super(id, key);
this.value = value;
}
@Override
public Iterable<Mutation> create() {
byte[] bytes = AccumuloByteSerializer.serialize(value);
Mutation m = new Mutation(id);
m.put(key.getBytes(), Constants.EMPTY, bytes);
return Lists.newArrayList(m);
}
}

View File

@@ -0,0 +1,38 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.mutator.vertex;
import org.apache.accumulo.core.data.Mutation;
import com.google.common.collect.Lists;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.mutator.Mutator;
public final class AddVertexMutator implements Mutator {
private final String id;
public AddVertexMutator(String id) {
this.id = id;
}
@Override
public Iterable<Mutation> create() {
Mutation m = new Mutation(id);
m.put(Constants.LABEL.getBytes(),
Constants.EXISTS.getBytes(), Constants.EMPTY);
return Lists.newArrayList(m);
}
}

View File

@@ -0,0 +1,33 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* Edge-specific index parser.
*/
public class EdgeIndexParser extends ElementIndexParser<AccumuloEdge> {
public EdgeIndexParser(GlobalInstances globals) {
super(globals);
}
@Override
protected AccumuloEdge instantiate(String id) {
return new AccumuloEdge(globals, id);
}
}

View File

@@ -0,0 +1,70 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* TODO
*/
public class EdgeParser extends ElementParser<AccumuloEdge> {
public EdgeParser(GlobalInstances globals) {
super(globals);
}
@Override
public AccumuloEdge parse(String id, Iterable<Entry<Key,Value>> entries) {
AccumuloEdge edge = makeEdge(id, entries);
setInMemoryProperties(edge, entries);
return edge;
}
/**
* Make and return an edge object. If the entries
* contain label/endpoint information, set those too.
* @param id
* @param entries
* @return
*/
private AccumuloEdge makeEdge(String id, Iterable<Entry<Key,Value>> entries) {
for (Entry<Key, Value> entry : entries) {
String cf = entry.getKey().getColumnFamily().toString();
if (Constants.LABEL.equals(cf)) {
String cq = entry.getKey().getColumnQualifier().toString();
String[] parts = cq.split(Constants.ID_DELIM);
String inVertexId = parts[0];
String outVertexId = parts[1];
String label = AccumuloByteSerializer.deserialize(entry.getValue().get());
return new AccumuloEdge(globals, id,
new AccumuloVertex(globals, inVertexId),
new AccumuloVertex(globals, outVertexId), label);
}
}
// This should not happen.
throw new AccumuloGraphException("Unable to parse edge from entries");
}
}

View File

@@ -0,0 +1,72 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* Parser for elements based on an index table.
*/
public abstract class ElementIndexParser<T extends AccumuloElement>
implements EntryParser<T> {
protected final GlobalInstances globals;
public ElementIndexParser(GlobalInstances globals) {
this.globals = globals;
}
@Override
public T parse(Iterable<Entry<Key, Value>> entries) {
Iterator<Entry<Key, Value>> it = entries.iterator();
if (it.hasNext()) {
Entry<Key, Value> entry = it.next();
if (it.hasNext()) {
throw new AccumuloGraphException("Unexpected multiple entries for index table");
}
String id = entry.getKey().getColumnQualifierData().toString();
T element = instantiate(id);
// While we're here, read the property key/value.
String key = entry.getKey().getColumnFamily().toString();
Object value = AccumuloByteSerializer.deserialize(entry.getKey().getRow().getBytes());
element.setPropertyInMemory(key, value);
return element;
}
else {
throw new AccumuloGraphException("No index table entries found");
}
}
/**
* Instantiate an object to be returned.
* @param id
* @return
*/
protected abstract T instantiate(String id);
}

View File

@@ -0,0 +1,64 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* TODO
*/
public abstract class ElementParser<T extends AccumuloElement> implements EntryParser<T> {
protected GlobalInstances globals;
public ElementParser(GlobalInstances globals) {
this.globals = globals;
}
@Override
public T parse(Iterable<Entry<Key, Value>> entries) {
String id = entries.iterator().next().getKey().getRow().toString();
return parse(id, entries);
}
/**
* Given the element id and set of entries,
* create the type of element. This can leverage
* the property loader, etc.
* @param id
* @param entries
* @return
*/
public abstract T parse(String id, Iterable<Entry<Key, Value>> entries);
/**
* Parse out the property entries and set them for the given element.
* @param element
* @param entries
*/
protected void setInMemoryProperties(T element, Iterable<Entry<Key, Value>> entries) {
Map<String, Object> props = new PropertyParser().parse(entries);
for (String key : props.keySet()) {
element.setPropertyInMemory(key, props.get(key));
}
}
}

View File

@@ -0,0 +1,28 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
/**
* TODO
*/
public interface EntryParser<T> {
public T parse(Iterable<Entry<Key, Value>> entries);
}

View File

@@ -0,0 +1,44 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.IndexableGraph;
import com.tinkerpop.blueprints.KeyIndexableGraph;
/**
* An indexed item. For {@link IndexableGraph},
* the key is the index name. For {@link KeyIndexableGraph}
* the key is the indexed key.
* @author Michael Lieberman
*
*/
public class IndexedItem {
private final String key;
private final Class<? extends Element> elementClass;
public IndexedItem(String key, Class<? extends Element> elementClass) {
this.key = key;
this.elementClass = elementClass;
}
public String getKey() {
return key;
}
public Class<? extends Element> getElementClass() {
return elementClass;
}
}

View File

@@ -0,0 +1,91 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.IndexableGraph;
import com.tinkerpop.blueprints.KeyIndexableGraph;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
/**
* Entry parser for index metadata. The format
* is the same for both {@link IndexableGraph}
* and {@link KeyIndexableGraph} functionality.
* For the former, this parser returns names of
* indexes. For the latter, the parser returns
* indexed keys.
*/
public class IndexedItemsListParser implements EntryParser<List<IndexedItem>> {
private final Class<? extends Element> elementClass;
/**
* Constructor to return all items regardless
* of element class.
*/
public IndexedItemsListParser() {
this(Element.class);
}
/**
* Create a parser for items of the specified element class.
* This may be Vertex, Edge, or Element.
* @param elementClass
*/
public IndexedItemsListParser(Class<? extends Element> elementClass) {
// Validate element class.
if (!Vertex.class.equals(elementClass) &&
!Edge.class.equals(elementClass) &&
!Element.class.equals(elementClass)) {
throw new IllegalArgumentException("elementClass must be Vertex, Edge or Element");
}
this.elementClass = elementClass;
}
@SuppressWarnings("unchecked")
@Override
public List<IndexedItem> parse(Iterable<Entry<Key,Value>> entries) {
List<IndexedItem> items = new ArrayList<IndexedItem>();
for (Entry<Key, Value> entry : entries) {
Class<? extends Element> clazz;
try {
clazz = (Class<? extends Element>) Class.forName(entry.getKey()
.getColumnFamily().toString());
} catch (ClassNotFoundException e) {
throw new AccumuloGraphException(e);
}
if (Element.class.equals(elementClass) ||
elementClass.equals(clazz)) {
IndexedItem item = new IndexedItem(entry.getKey()
.getRow().toString(), clazz);
items.add(item);
}
}
return items;
}
}

View File

@@ -0,0 +1,66 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.Constants;
/**
* TODO
*/
public class PropertyParser implements EntryParser<Map<String, Object>> {
@Override
public Map<String,Object> parse(Iterable<Entry<Key,Value>> entries) {
Map<String, Object> props = null;
for (Entry<Key, Value> entry : entries) {
if (props == null) {
props = new HashMap<String, Object>();
}
Key key = entry.getKey();
if (!isMetaKey(key)) {
String attr = key.getColumnFamily().toString();
Object value = AccumuloByteSerializer.deserialize(entry.getValue().get());
props.put(attr, value);
}
}
return props;
}
/**
* Test whether the given Accumulo key represents a
* metadata key (e.g. existence, edge endpoint, etc),
* rather than a property.
* @param key
* @return
*/
private static boolean isMetaKey(Key key) {
String cf = key.getColumnFamily().toString();
return Constants.LABEL.equals(cf) ||
Constants.IN_EDGE.equals(cf) ||
Constants.OUT_EDGE.equals(cf);
}
}

View File

@@ -0,0 +1,33 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* Vertex-specific index parser.
*/
public class VertexIndexParser extends ElementIndexParser<AccumuloVertex> {
public VertexIndexParser(GlobalInstances globals) {
super(globals);
}
@Override
protected AccumuloVertex instantiate(String id) {
return new AccumuloVertex(globals, id);
}
}

View File

@@ -12,29 +12,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
package edu.jhuapl.tinkerpop.parser;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
public enum EntryLocation {
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
Row, ColF, ColQ, Value;
/**
* TODO
*/
public class VertexParser extends ElementParser<AccumuloVertex> {
public String extract(Entry<Key,Value> entry) {
switch (this) {
case Row:
return entry.getKey().getRow().toString();
case ColF:
return entry.getKey().getColumnFamily().toString();
case ColQ:
return entry.getKey().getColumnQualifier().toString();
case Value:
return new String(entry.getValue().get());
default:
throw new AccumuloGraphException("Unexpected type: " + this);
}
public VertexParser(GlobalInstances globals) {
super(globals);
}
@Override
public AccumuloVertex parse(String id, Iterable<Entry<Key,Value>> entries) {
AccumuloVertex vertex = new AccumuloVertex(globals, id);
setInMemoryProperties(vertex, entries);
return vertex;
}
}

View File

@@ -0,0 +1,178 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.CloseableIterable;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.IndexableGraph;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloGraphUtils;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.index.IndexValueMutator;
import edu.jhuapl.tinkerpop.parser.EdgeIndexParser;
import edu.jhuapl.tinkerpop.parser.ElementIndexParser;
import edu.jhuapl.tinkerpop.parser.VertexIndexParser;
/**
* Wrapper around index tables containing properties
* and values.
*/
public abstract class BaseIndexValuesTableWrapper extends BaseTableWrapper {
protected final Class<? extends Element> elementType;
protected BaseIndexValuesTableWrapper(GlobalInstances globals,
Class<? extends Element> elementType, String tableName) {
super(globals, tableName);
this.elementType = elementType;
}
/**
* Return class of this index.
* @return
*/
public Class<? extends Element> getElementType() {
return elementType;
}
/**
* Add the property to this index, if autoindexing is enabled
* and/or the given key has indexing enabled.
* @param element
* @param key
* @param value
*/
public void setPropertyForIndex(Element element, String key, Object value) {
setPropertyForIndex(element, key, value, false);
}
/**
* Add the property to this index.
*
* <p/>Note that this requires a round-trip to Accumulo to see
* if the property exists if the provided key has an index.
* So for best performance, create indices after bulk ingest.
* <p/>If the force parameter is true, set the property regardless
* of whether indexing is enabled for the given key. This is needed
* for {@link IndexableGraph} operations.
* @param element
* @param key
* @param value
* @param force
*/
public void setPropertyForIndex(Element element, String key, Object value,
boolean force) {
AccumuloGraphUtils.validateProperty(key, value);
if (force || globals.getConfig().getAutoIndex() ||
globals.getIndexedKeysListWrapper()
.getIndexedKeys(elementType).contains(key)) {
BatchWriter writer = getWriter();
Object oldValue = element.getProperty(key);
if (oldValue != null && !oldValue.equals(value)) {
Mutators.apply(writer, new IndexValueMutator.Delete(element, key, oldValue));
}
Mutators.apply(writer, new IndexValueMutator.Add(element, key, value));
globals.checkedFlush();
}
}
/**
* Remove property from the index.
* @param element
* @param key
* @param value
*/
public void removePropertyFromIndex(Element element, String key, Object value) {
if (value != null) {
Mutators.apply(getWriter(), new IndexValueMutator.Delete(element, key, value));
globals.checkedFlush();
}
}
/**
* Get elements with the key/value pair.
* @param key
* @param value
* @return
*/
@SuppressWarnings("unchecked")
public <T extends Element> CloseableIterable<T> readElementsFromIndex(String key, Object value) {
Scanner scan = getScanner();
byte[] id = AccumuloByteSerializer.serialize(value);
scan.setRange(Range.exact(new Text(id)));
scan.fetchColumnFamily(new Text(key));
final ElementIndexParser<? extends AccumuloElement> parser =
Vertex.class.equals(elementType) ? new VertexIndexParser(globals) :
new EdgeIndexParser(globals);
return new ScannerIterable<T>(scan) {
@Override
public T next(PeekingIterator<Entry<Key,Value>> iterator) {
return (T) parser.parse(Arrays.asList(iterator.next()));
}
};
}
/**
* Remove the given element's properties from the index.
* @param element
*/
public void removeElementFromIndex(Element element) {
BatchDeleter deleter = null;
try {
deleter = getDeleter();
deleter.setRanges(Collections.singleton(new Range()));
IteratorSetting is = new IteratorSetting(10, "getEdgeFilter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null,
"^"+Pattern.quote(element.getId().toString())+"$", null, false);
deleter.addScanIterator(is);
deleter.delete();
deleter.close();
} catch (Exception e) {
throw new AccumuloGraphException(e);
} finally {
if (deleter != null) {
deleter.close();
}
}
}
}

View File

@@ -0,0 +1,42 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables;
import com.tinkerpop.blueprints.Element;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.index.IndexMetadataMutator;
/**
* Wraps the metadata tables which stores information
* about which property keys are indexed for different
* graph types.
*/
public abstract class BaseIndexedItemsListTableWrapper extends BaseTableWrapper {
protected BaseIndexedItemsListTableWrapper(GlobalInstances globals,
String tableName) {
super(globals, tableName);
}
protected void writeEntry(String key, Class<? extends Element> clazz) {
Mutators.apply(getWriter(), new IndexMetadataMutator.Add(key, clazz));
}
protected void clearEntry(String key, Class<? extends Element> clazz) {
Mutators.apply(getWriter(), new IndexMetadataMutator.Delete(key, clazz));
}
}

View File

@@ -0,0 +1,90 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables;
import java.util.Collections;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* Table wrapper with common functionality.
*/
public abstract class BaseTableWrapper {
protected GlobalInstances globals;
private String tableName;
public BaseTableWrapper(GlobalInstances globals, String tableName) {
this.globals = globals;
this.tableName = tableName;
}
protected Scanner getScanner() {
try {
return globals.getConfig().getConnector().createScanner(tableName,
globals.getConfig().getAuthorizations());
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
protected BatchScanner getBatchScanner() {
try {
BatchScanner scanner = globals.getConfig().getConnector().createBatchScanner(tableName,
globals.getConfig().getAuthorizations(), globals.getConfig().getQueryThreads());
scanner.setRanges(Collections.singletonList(new Range()));
return scanner;
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
protected BatchWriter getWriter() {
try {
return globals.getMtbw().getBatchWriter(tableName);
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
protected BatchDeleter getDeleter() {
try {
return globals.getConfig().getConnector().createBatchDeleter(tableName,
globals.getConfig().getAuthorizations(), globals.getConfig().getMaxWriteThreads(),
globals.getConfig().getBatchWriterConfig());
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
public void dump() {
System.out.println("Dump of table "+tableName+":");
Scanner s = getScanner();
for (Entry<Key, Value> entry : s) {
System.out.println(" "+entry);
}
}
}

View File

@@ -0,0 +1,176 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.core;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloGraphUtils;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.edge.EdgeMutator;
import edu.jhuapl.tinkerpop.parser.EdgeParser;
/**
* Wrapper around {@link Edge} tables.
*/
public class EdgeTableWrapper extends ElementTableWrapper {
public EdgeTableWrapper(GlobalInstances globals) {
super(globals, globals.getConfig().getEdgeTableName());
}
/**
* Write the given edge to the edge table. Does not
* currently write the edge's properties.
*
* <p/>Note: This only adds the edge information. Vertex
* endpoint information needs to be written to the vertex
* table via {@link VertexTableWrapper}.
* @param edge
*/
public void writeEdge(Edge edge) {
Mutators.apply(getWriter(), new EdgeMutator.Add(edge));
globals.checkedFlush();
}
public void deleteEdge(Edge edge) {
Mutators.apply(getWriter(), new EdgeMutator.Delete(edge));
globals.checkedFlush();
}
public Iterable<Edge> getEdges() {
Scanner scan = getScanner();
scan.fetchColumnFamily(new Text(Constants.LABEL));
if (globals.getConfig().getPreloadedProperties() != null) {
for (String key : globals.getConfig().getPreloadedProperties()) {
scan.fetchColumnFamily(new Text(key));
}
}
final EdgeParser parser = new EdgeParser(globals);
return new ScannerIterable<Edge>(scan) {
@Override
public Edge next(PeekingIterator<Entry<Key, Value>> iterator) {
// TODO could also check local cache before creating a new instance?
String rowId = iterator.peek().getKey().getRow().toString();
List<Entry<Key, Value>> entries =
new ArrayList<Entry<Key, Value>>();
// MDL 05 Jan 2014: Why is this equalsIgnoreCase??
while (iterator.peek() != null && rowId.equalsIgnoreCase(iterator
.peek().getKey().getRow().toString())) {
entries.add(iterator.next());
}
AccumuloEdge edge = parser.parse(rowId, entries);
globals.getCaches().cache(edge, Edge.class);
return edge;
}
};
}
public Iterable<Edge> getEdges(String key, Object value) {
AccumuloGraphUtils.nullCheckProperty(key, value);
if (key.equalsIgnoreCase("label")) {
key = Constants.LABEL;
}
BatchScanner scan = getBatchScanner();
scan.fetchColumnFamily(new Text(key));
byte[] val = AccumuloByteSerializer.serialize(value);
if (val[0] != AccumuloByteSerializer.SERIALIZABLE) {
IteratorSetting is = new IteratorSetting(10, "filter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null, null, Pattern.quote(new String(val)), false);
scan.addScanIterator(is);
return new ScannerIterable<Edge>(scan) {
@Override
public Edge next(PeekingIterator<Entry<Key,Value>> iterator) {
Key k = iterator.next().getKey();
if (k.getColumnFamily().toString().equals(Constants.LABEL)) {
String[] vals = k.getColumnQualifier().toString().split(Constants.ID_DELIM);
return new AccumuloEdge(globals, k.getRow().toString(),
new AccumuloVertex(globals, vals[0]),
new AccumuloVertex(globals, vals[1]), null);
}
return new AccumuloEdge(globals, k.getRow().toString());
}
};
} else {
// TODO
throw new UnsupportedOperationException("Filtering on binary data not currently supported.");
}
}
public void loadEndpointsAndLabel(AccumuloEdge edge) {
Scanner s = getScanner();
try {
s.setRange(new Range(edge.getId().toString()));
s.fetchColumnFamily(new Text(Constants.LABEL));
Iterator<Entry<Key,Value>> iter = s.iterator();
if (!iter.hasNext()) {
dump();
throw new AccumuloGraphException("Unable to find edge row: "+edge);
}
Entry<Key, Value> entry = iter.next();
String cq = entry.getKey().getColumnQualifier().toString();
String[] ids = cq.split(Constants.ID_DELIM);
String label = AccumuloByteSerializer.deserialize(entry.getValue().get());
edge.setVertices(new AccumuloVertex(globals, ids[0]),
new AccumuloVertex(globals, ids[1]));
edge.setLabel(label);
} finally {
s.close();
}
}
}

View File

@@ -0,0 +1,226 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.core;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.util.StringFactory;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.mutator.property.ClearPropertyMutator;
import edu.jhuapl.tinkerpop.mutator.property.WritePropertyMutator;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.parser.PropertyParser;
import edu.jhuapl.tinkerpop.tables.BaseTableWrapper;
/**
* Wrapper around tables with operations
* common to {@link Element}s.
*/
public abstract class ElementTableWrapper extends BaseTableWrapper {
private BatchWriter writer;
public ElementTableWrapper(GlobalInstances globals, String tableName) {
super(globals, tableName);
writer = super.getWriter();
}
/**
* Give a single instance of the writer for this table.
*/
@Override
protected BatchWriter getWriter() {
return writer;
}
/**
* Read the given property from the backing table
* for the given element id.
* @param id
* @param key
* @return
*/
public <V> V readProperty(Element element, String key) {
Scanner s = getScanner();
s.setRange(new Range(element.getId().toString()));
Text colf = StringFactory.LABEL.equals(key)
? new Text(Constants.LABEL) : new Text(key);
s.fetchColumnFamily(colf);
V value = null;
Iterator<Entry<Key, Value>> iter = s.iterator();
if (iter.hasNext()) {
value = AccumuloByteSerializer.deserialize(iter.next().getValue().get());
}
s.close();
return value;
}
/**
* Read all properties for the given element
* from the backing table.
* If the element has no properties, return an empty Map.
* If the element does not exist, return null.
* @param element
* @return
*/
public Map<String, Object> readAllProperties(Element element) {
return readProperties(element, null);
}
/**
* Read the given properties for the given element.
* If propertyKeys is null, read all properties.
* If the element has no properties, return an empty Map.
* If the element does not exist, return null.
* @param id
* @param propertyKeys
* @return
*/
public Map<String, Object> readProperties(Element element, String[] propertyKeys) {
Scanner s = getScanner();
s.setRange(Range.exact((String) element.getId()));
// If propertyKeys is null, we read everything.
// Otherwise, limit to the given attributes.
if (propertyKeys != null) {
s.fetchColumnFamily(new Text(Constants.LABEL));
for (String key : propertyKeys) {
s.fetchColumnFamily(new Text(key));
}
}
Map<String, Object> props = new PropertyParser().parse(s);
s.close();
return props;
}
/**
* Return true if the element with given id exists.
* @param id
* @return
*/
public boolean elementExists(String id) {
Scanner scan = null;
try {
scan = getScanner();
scan.setRange(Range.exact(id));
scan.fetchColumnFamily(new Text(Constants.LABEL));
return new PropertyParser().parse(scan) != null;
} finally {
if (scan != null) {
scan.close();
}
}
}
/**
* Get all property keys for the given element id.
* @param id
* @return
*/
public Set<String> readPropertyKeys(Element element) {
Scanner s = getScanner();
s.setRange(new Range(element.getId().toString()));
Set<String> keys = new HashSet<String>();
for (Entry<Key, Value> entry : s) {
String cf = entry.getKey().getColumnFamily().toString();
keys.add(cf);
}
s.close();
// Remove some special keys.
keys.remove(Constants.IN_EDGE);
keys.remove(Constants.LABEL);
keys.remove(Constants.OUT_EDGE);
return keys;
}
/**
* Delete the property entry from property table.
* @param id
* @param key
*/
public void clearProperty(Element element, String key) {
Mutators.apply(getWriter(),
new ClearPropertyMutator(element.getId().toString(), key));
globals.checkedFlush();
}
/**
* Write the given property to the property table.
* @param id
* @param key
* @param value
*/
public void writeProperty(Element element, String key, Object value) {
Mutators.apply(getWriter(),
new WritePropertyMutator(element.getId().toString(),
key, value));
globals.checkedFlush();
}
/**
* Add custom iterator to the given scanner so that
* it will only return keys with value corresponding to an edge.
* @param scan
* @param labels
*/
protected void applyEdgeLabelValueFilter(Scanner scan, String... labels) {
StringBuilder regex = new StringBuilder();
for (String lab : labels) {
if (regex.length() != 0)
regex.append("|");
regex.append(".*"+Constants.ID_DELIM+"\\Q").append(lab).append("\\E$");
}
IteratorSetting is = new IteratorSetting(10, "edgeValueFilter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null, null, regex.toString(), false);
scan.addScanIterator(is);
}
public void close() {
// TODO?
}
}

View File

@@ -0,0 +1,262 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloGraphUtils;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.Constants;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.mutator.vertex.AddVertexMutator;
import edu.jhuapl.tinkerpop.mutator.Mutator;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.edge.EdgeEndpointsMutator;
import edu.jhuapl.tinkerpop.parser.VertexParser;
/**
* Wrapper around {@link Vertex} tables.
*/
public class VertexTableWrapper extends ElementTableWrapper {
public VertexTableWrapper(GlobalInstances globals) {
super(globals, globals.getConfig().getVertexTableName());
}
/**
* Write a vertex with the given id.
* Note: This does not currently write the vertex's properties.
* @param vertex
*/
public void writeVertex(Vertex vertex) {
Mutators.apply(getWriter(), new AddVertexMutator(vertex.getId().toString()));
globals.checkedFlush();
}
/**
* Remove the given vertex.
* Note: This uses a BatchDeleter rather than {@link Mutator}
* because it is more efficient.
* @param vertex
*/
public void deleteVertex(Vertex vertex) {
BatchDeleter deleter = null;
try {
deleter = getDeleter();
deleter.setRanges(Arrays.asList(Range.exact((String) vertex.getId())));
deleter.delete();
} catch (Exception e) {
throw new AccumuloGraphException(e);
} finally {
if (deleter != null) {
deleter.close();
}
}
}
/**
* Write edge endpoint information to the vertex table.
* @param edge
*/
public void writeEdgeEndpoints(Edge edge) {
Mutators.apply(getWriter(), new EdgeEndpointsMutator.Add(edge));
globals.checkedFlush();
}
public void deleteEdgeEndpoints(Edge edge) {
Mutators.apply(getWriter(), new EdgeEndpointsMutator.Delete(edge));
globals.checkedFlush();
}
public Iterable<Edge> getEdges(Vertex vertex, Direction direction,
String... labels) {
Scanner scan = getScanner();
scan.setRange(new Range(vertex.getId().toString()));
if (direction.equals(Direction.IN)) {
scan.fetchColumnFamily(new Text(Constants.IN_EDGE));
} else if (direction.equals(Direction.OUT)) {
scan.fetchColumnFamily(new Text(Constants.OUT_EDGE));
} else {
scan.fetchColumnFamily(new Text(Constants.IN_EDGE));
scan.fetchColumnFamily(new Text(Constants.OUT_EDGE));
}
if (labels.length > 0) {
applyEdgeLabelValueFilter(scan, labels);
}
return new ScannerIterable<Edge>(scan) {
@Override
public Edge next(PeekingIterator<Entry<Key,Value>> iterator) {
// TODO better use of information readily available...
// TODO could also check local cache before creating a new
// instance?
Entry<Key,Value> kv = iterator.next();
String[] parts = kv.getKey().getColumnQualifier().toString().split(Constants.ID_DELIM);
String label = (new String(kv.getValue().get())).split(Constants.ID_DELIM)[1];
AccumuloEdge edge;
if (kv.getKey().getColumnFamily().toString().equalsIgnoreCase(Constants.IN_EDGE)) {
edge = new AccumuloEdge(globals, parts[1],
new AccumuloVertex(globals, kv.getKey().getRow().toString()),
new AccumuloVertex(globals, parts[0]), label);
} else {
edge = new AccumuloEdge(globals, parts[1],
new AccumuloVertex(globals, parts[0]),
new AccumuloVertex(globals, kv.getKey().getRow().toString()), label);
}
globals.getCaches().cache(edge, Edge.class);
return edge;
}
};
}
public Iterable<Vertex> getVertices(Vertex vertex, Direction direction, String... labels) {
Scanner scan = getScanner();
scan.setRange(new Range(vertex.getId().toString()));
if (direction.equals(Direction.IN)) {
scan.fetchColumnFamily(new Text(Constants.IN_EDGE));
} else if (direction.equals(Direction.OUT)) {
scan.fetchColumnFamily(new Text(Constants.OUT_EDGE));
} else {
scan.fetchColumnFamily(new Text(Constants.IN_EDGE));
scan.fetchColumnFamily(new Text(Constants.OUT_EDGE));
}
if (labels != null && labels.length > 0) {
applyEdgeLabelValueFilter(scan, labels);
}
return new ScannerIterable<Vertex>(scan) {
@Override
public Vertex next(PeekingIterator<Entry<Key,Value>> iterator) {
// TODO better use of information readily available...
// TODO could also check local cache before creating a new
// instance?
String[] parts = iterator.next().getKey().getColumnQualifier()
.toString().split(Constants.ID_DELIM);
AccumuloVertex vertex = new AccumuloVertex(globals, parts[0]);
globals.getCaches().cache(vertex, Vertex.class);
return vertex;
}
};
}
public Iterable<Vertex> getVertices() {
Scanner scan = getScanner();
scan.fetchColumnFamily(new Text(Constants.LABEL));
if (globals.getConfig().getPreloadedProperties() != null) {
for (String key : globals.getConfig().getPreloadedProperties()) {
scan.fetchColumnFamily(new Text(key));
}
}
final VertexParser parser = new VertexParser(globals);
return new ScannerIterable<Vertex>(scan) {
@Override
public Vertex next(PeekingIterator<Entry<Key, Value>> iterator) {
// TODO could also check local cache before creating a new instance?
String rowId = iterator.peek().getKey().getRow().toString();
List<Entry<Key, Value>> entries =
new ArrayList<Entry<Key, Value>>();
while (iterator.peek() != null && rowId.equals(iterator
.peek().getKey().getRow().toString())) {
entries.add(iterator.next());
}
AccumuloVertex vertex = parser.parse(rowId, entries);
globals.getCaches().cache(vertex, Vertex.class);
return vertex;
}
};
}
public Iterable<Vertex> getVertices(String key, Object value) {
AccumuloGraphUtils.validateProperty(key, value);
byte[] val = AccumuloByteSerializer.serialize(value);
if (val[0] != AccumuloByteSerializer.SERIALIZABLE) {
BatchScanner scan = getBatchScanner();
scan.fetchColumnFamily(new Text(key));
IteratorSetting is = new IteratorSetting(10, "filter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null, null, Pattern.quote(new String(val)), false);
scan.addScanIterator(is);
return new ScannerIterable<Vertex>(scan) {
@Override
public Vertex next(PeekingIterator<Entry<Key,Value>> iterator) {
Entry<Key, Value> kv = iterator.next();
String key = kv.getKey().getColumnFamily().toString();
Object value = AccumuloByteSerializer.deserialize(kv.getValue().get());
Vertex v = globals.getCaches().retrieve(kv.getKey().getRow().toString(), Vertex.class);
if (v == null) {
v = new AccumuloVertex(globals, kv.getKey().getRow().toString());
}
((AccumuloElement) v).setPropertyInMemory(key, value);
globals.getCaches().cache(v, Vertex.class);
return v;
}
};
} else {
// TODO
throw new UnsupportedOperationException("Filtering on binary data not currently supported.");
}
}
}

View File

@@ -0,0 +1,73 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.keyindex;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.tables.BaseIndexValuesTableWrapper;
import edu.jhuapl.tinkerpop.tables.core.EdgeTableWrapper;
import edu.jhuapl.tinkerpop.tables.core.ElementTableWrapper;
import edu.jhuapl.tinkerpop.tables.core.VertexTableWrapper;
/**
* Base class for key index tables.
*/
public class BaseKeyIndexTableWrapper extends BaseIndexValuesTableWrapper {
protected BaseKeyIndexTableWrapper(GlobalInstances globals,
Class<? extends Element> elementType, String tableName) {
super(globals, elementType, tableName);
}
/**
* Rebuild this index for the given table.
* @param table
* @param key
*/
public void rebuildIndex(String key, Class<? extends Element> elementClass) {
ElementTableWrapper wrapper = globals.getElementWrapper(elementClass);
if (wrapper instanceof VertexTableWrapper) {
for (Vertex v : ((VertexTableWrapper) wrapper).getVertices()) {
rebuild(wrapper, v, key);
}
}
else if (wrapper instanceof EdgeTableWrapper) {
for (Edge e : ((EdgeTableWrapper) wrapper).getEdges()) {
rebuild(wrapper, e, key);
}
}
else {
throw new AccumuloGraphException("Unexpected table wrapper: "+wrapper.getClass());
}
globals.checkedFlush();
}
/**
* Add given element to index for the given key.
* @param element
* @param key
*/
private void rebuild(ElementTableWrapper wrapper,
Element element, String key) {
Object value = wrapper.readProperty(element, key);
if (value != null) {
setPropertyForIndex(element, key, value);
}
}
}

View File

@@ -0,0 +1,86 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.keyindex;
import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.parser.EdgeIndexParser;
/**
* Wrapper around {@link Edge} index table.
*/
public class EdgeKeyIndexTableWrapper extends BaseKeyIndexTableWrapper {
public EdgeKeyIndexTableWrapper(GlobalInstances globals) {
super(globals, Edge.class, globals.getConfig()
.getEdgeKeyIndexTableName());
}
/**
* Retrieve edges from the index table based
* on the given key/value.
* @param key
* @param value
* @return
*/
public Iterable<Edge> getEdges(String key, Object value) {
Scanner s = getScanner();
Text row = new Text(AccumuloByteSerializer.serialize(value));
s.setRange(Range.exact(row));
s.fetchColumnFamily(new Text(key));
final EdgeIndexParser parser = new EdgeIndexParser(globals);
return new ScannerIterable<Edge>(s) {
@Override
public Edge next(PeekingIterator<Entry<Key, Value>> iterator) {
Entry<Key, Value> entry = iterator.next();
AccumuloEdge e = parser.parse(Arrays.asList(entry));
// Check if we have it cached already, in which
// case use the cached version.
AccumuloEdge cached = (AccumuloEdge) globals.getCaches()
.retrieve(e.getId(), Edge.class);
if (cached != null) {
for (String key : e.getPropertyKeysInMemory()) {
cached.setPropertyInMemory(key, e.getPropertyInMemory(key));
}
return cached;
}
// We don't have it, so cache the new one and return it.
globals.getCaches().cache(e, Edge.class);
return e;
}
};
}
}

View File

@@ -0,0 +1,73 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.keyindex;
import java.util.HashSet;
import java.util.Set;
import org.apache.accumulo.core.client.Scanner;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.parser.IndexedItem;
import edu.jhuapl.tinkerpop.parser.IndexedItemsListParser;
import edu.jhuapl.tinkerpop.tables.BaseIndexedItemsListTableWrapper;
/**
* Wraps the metadata tables which stores information
* about which property keys are indexed for different
* graph types.
*/
public class IndexedKeysListTableWrapper extends BaseIndexedItemsListTableWrapper {
public IndexedKeysListTableWrapper(GlobalInstances globals) {
super(globals, globals.getConfig().getIndexedKeysTableName());
}
public void writeKeyMetadataEntry(String key, Class<? extends Element> clazz) {
writeEntry(key, clazz);
}
public void clearKeyMetadataEntry(String key, Class<? extends Element> clazz) {
clearEntry(key, clazz);
}
public <T extends Element> Set<String> getIndexedKeys(Class<T> elementClass) {
if (elementClass == null) {
throw ExceptionFactory.classForElementCannotBeNull();
}
IndexedItemsListParser parser = new IndexedItemsListParser(elementClass);
Scanner scan = null;
try {
scan = getScanner();
Set<String> keys = new HashSet<String>();
for (IndexedItem item : parser.parse(scan)) {
keys.add(item.getKey());
}
return keys;
} finally {
if (scan != null) {
scan.close();
}
}
}
}

View File

@@ -0,0 +1,85 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.keyindex;
import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.parser.VertexIndexParser;
/**
* Wrapper around {@link Vertex} index table.
*/
public class VertexKeyIndexTableWrapper extends BaseKeyIndexTableWrapper {
public VertexKeyIndexTableWrapper(GlobalInstances globals) {
super(globals, Vertex.class, globals.getConfig()
.getVertexKeyIndexTableName());
}
/**
* Use the index to retrieve vertices with the
* given key/value.
* @param key
* @param value
*/
public Iterable<Vertex> getVertices(String key, Object value) {
Scanner s = getScanner();
Text row = new Text(AccumuloByteSerializer.serialize(value));
s.setRange(Range.exact(row));
s.fetchColumnFamily(new Text(key));
final VertexIndexParser parser = new VertexIndexParser(globals);
return new ScannerIterable<Vertex>(s) {
@Override
public Vertex next(PeekingIterator<Entry<Key, Value>> iterator) {
Entry<Key, Value> entry = iterator.next();
AccumuloVertex v = parser.parse(Arrays.asList(entry));
// Check if we have it cached already, in which
// case use the cached version.
AccumuloVertex cached = (AccumuloVertex) globals.getCaches()
.retrieve(v.getId(), Vertex.class);
if (cached != null) {
for (String key : v.getPropertyKeysInMemory()) {
cached.setPropertyInMemory(key, v.getPropertyInMemory(key));
}
return cached;
}
// We don't have it, so cache the new one and return it.
globals.getCaches().cache(v, Vertex.class);
return v;
}
};
}
}

View File

@@ -0,0 +1,112 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.namedindex;
import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.client.Scanner;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Index;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import edu.jhuapl.tinkerpop.AccumuloIndex;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.parser.IndexedItem;
import edu.jhuapl.tinkerpop.parser.IndexedItemsListParser;
import edu.jhuapl.tinkerpop.tables.BaseIndexedItemsListTableWrapper;
/**
* Wrapper around index metadata table. This lists
* names of indexes and their element types.
*/
public class NamedIndexListTableWrapper extends BaseIndexedItemsListTableWrapper {
public NamedIndexListTableWrapper(GlobalInstances globals) {
super(globals, globals.getConfig().getIndexNamesTableName());
}
public void writeIndexNameEntry(String indexName,
Class<? extends Element> indexClass) {
writeEntry(indexName, indexClass);
}
public void clearIndexNameEntry(String indexName,
Class<? extends Element> indexClass) {
clearEntry(indexName, indexClass);
}
@SuppressWarnings({"rawtypes", "unchecked"})
public Iterable<Index<? extends Element>> getIndices() {
List<Index<? extends Element>> indexes = new ArrayList<Index<? extends Element>>();
IndexedItemsListParser parser = new IndexedItemsListParser();
Scanner scan = null;
try {
scan = getScanner();
for (IndexedItem item : parser.parse(scan)) {
indexes.add(new AccumuloIndex(globals,
item.getKey(), item.getElementClass()));
}
return indexes;
} finally {
if (scan != null) {
scan.close();
}
}
}
public <T extends Element> Index<T> getIndex(String indexName,
Class<T> indexClass) {
IndexedItemsListParser parser = new IndexedItemsListParser();
Scanner scan = null;
try {
scan = getScanner();
for (IndexedItem item : parser.parse(scan)) {
if (item.getKey().equals(indexName)) {
if (item.getElementClass().equals(indexClass)) {
return new AccumuloIndex<T>(globals, indexName,
indexClass);
}
else {
throw ExceptionFactory.indexDoesNotSupportClass(indexName, indexClass);
}
}
}
return null;
} finally {
scan.close();
}
}
public <T extends Element> Index<T> createIndex(String indexName, Class<T> indexClass) {
for (Index<?> index : globals.getNamedIndexListWrapper().getIndices()) {
if (index.getIndexName().equals(indexName)) {
throw ExceptionFactory.indexAlreadyExists(indexName);
}
}
writeIndexNameEntry(indexName, indexClass);
return new AccumuloIndex<T>(globals, indexName, indexClass);
}
}

View File

@@ -0,0 +1,33 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.tables.namedindex;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.IndexableGraph;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.tables.BaseIndexValuesTableWrapper;
/**
* Wrapper around a named index table (for {@link IndexableGraph}).
*/
public class NamedIndexTableWrapper extends BaseIndexValuesTableWrapper {
public NamedIndexTableWrapper(GlobalInstances globals,
Class<? extends Element> elementType, String indexName) {
super(globals, elementType,
globals.getConfig().getNamedIndexTableName(indexName));
}
}

View File

@@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c - %m%n

View File

@@ -88,6 +88,7 @@ public class AccumuloGraphTest extends GraphTest {
printTestPerformance("GraphSONReaderTestSuite", this.stopWatch());
}
@Override
public void doTestSuite(final TestSuite testSuite) throws Exception {
String doTest = System.getProperty("testTinkerGraph");
if (doTest == null || doTest.equals("true")) {
@@ -112,12 +113,21 @@ public class AccumuloGraphTest extends GraphTest {
new AccumuloGraphTest().generateGraph();
}
@Override
public void dropGraph(final String graphDirectoryName) {
if (graphDirectoryName != null) {
((AccumuloGraph) generateGraph(graphDirectoryName)).clear();
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils.generateGraphConfig(graphDirectoryName);
try {
for (String table : cfg.getConnector().tableOperations().list()) {
cfg.getConnector().tableOperations().delete(table);
}
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
}
@Override
public Object convertId(final Object id) {
return id.toString();
}

View File

@@ -22,14 +22,11 @@ import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
public class AccumuloGraphTestUtils {
public static AccumuloGraphConfiguration generateGraphConfig(String graphDirectoryName) {
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration();
cfg.setInstanceName("instanceName").setZooKeeperHosts("ZookeeperHostsString");
cfg.setUser("root").setPassword("");
cfg.setGraphName(graphDirectoryName).setCreate(true).setAutoFlush(true).setInstanceType(InstanceType.Mock);
return cfg;
return new AccumuloGraphConfiguration().setInstanceType(InstanceType.Mock)
.setGraphName(graphDirectoryName).setCreate(true);
}
public static Graph makeGraph(String name) {
return GraphFactory.open(generateGraphConfig(name).getConfiguration());
return GraphFactory.open(generateGraphConfig(name));
}
}

View File

@@ -2,55 +2,117 @@ package edu.jhuapl.tinkerpop;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.junit.Test;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.Vertex;
public class AutoIndexTest {
@Test
public void testIndexCreation() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException {
AccumuloGraph ag = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils.generateGraphConfig("AutoIndexTest").setAutoIndex(true).getConfiguration());
String VERT = "1234";
String KEY = "name";
String VALUE = "bananaman";
public void testVertexAutoIndex() throws Exception {
AccumuloGraph graph = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils
.generateGraphConfig("VertexAutoIndexTest").setAutoIndex(true).getConfiguration());
String id = "1234";
String key = "name";
String value = "bananaman";
Vertex v1 = ag.addVertex(VERT);
v1.setProperty(KEY, VALUE);
Vertex v1 = graph.addVertex(id);
v1.setProperty(key, value);
Scanner scan = ag.getVertexIndexScanner();
for (Entry<Key,Value> kv : scan) {
assertEquals(new String(AccumuloByteSerializer.serialize(VALUE)), kv.getKey().getRow().toString());
assertEquals(KEY, kv.getKey().getColumnFamily().toString());
assertEquals(VERT, kv.getKey().getColumnQualifier().toString());
Iterable<Element> elements = graph.getGlobals()
.getVertexKeyIndexWrapper().readElementsFromIndex(key, value);
int count = 0;
for (Element element : elements) {
assertTrue(element instanceof Vertex);
assertEquals(id, element.getId());
assertEquals(value, element.getProperty(key));
count++;
}
assertEquals(1, count);
graph.removeVertex(v1);
elements = graph.getGlobals()
.getVertexKeyIndexWrapper().readElementsFromIndex(key, value);
assertEquals(0, count(elements));
}
@Test
public void testRegularCreation() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException {
AccumuloGraph ag = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils.generateGraphConfig("NoAutoIndexTest").getConfiguration());
String VERT = "1234";
String KEY = "name";
String VALUE = "bananaman";
public void testVertexNoAutoIndex() throws Exception {
AccumuloGraph graph = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils
.generateGraphConfig("VertexNoAutoIndexTest").getConfiguration());
String id = "1234";
String key = "name";
String value = "bananaman";
Vertex v1 = ag.addVertex(VERT);
v1.setProperty(KEY, VALUE);
Scanner scan = ag.getVertexIndexScanner();
for (Entry<Key,Value> kv : scan) {
assertTrue(false);
}
Vertex v1 = graph.addVertex(id);
v1.setProperty(key, value);
Iterable<Element> elements = graph.getGlobals()
.getVertexKeyIndexWrapper().readElementsFromIndex(key, value);
assertEquals(0, count(elements));
}
@Test
public void testEdgeAutoIndex() throws Exception {
AccumuloGraph graph = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils
.generateGraphConfig("EdgeAutoIndex").setAutoIndex(true).getConfiguration());
String id1 = "A";
String id2 = "B";
String eid = "X";
String key = "name";
String value = "bananaman";
Vertex v1 = graph.addVertex(id1);
Vertex v2 = graph.addVertex(id2);
Edge e = graph.addEdge(eid, v1, v2, "edge");
e.setProperty(key, value);
Iterable<Element> elements = graph.getGlobals()
.getEdgeKeyIndexWrapper().readElementsFromIndex(key, value);
int count = 0;
for (Element element : elements) {
assertTrue(element instanceof Edge);
assertEquals(eid, element.getId());
assertEquals(value, element.getProperty(key));
count++;
}
assertEquals(1, count);
graph.removeVertex(v1);
elements = graph.getGlobals()
.getEdgeKeyIndexWrapper().readElementsFromIndex(key, value);
assertEquals(0, count(elements));
}
@Test
public void testEdgeNoAutoIndex() throws Exception {
AccumuloGraph graph = (AccumuloGraph) GraphFactory.open(AccumuloGraphTestUtils
.generateGraphConfig("EdgeNoAutoIndexTest").getConfiguration());
String id1 = "A";
String id2 = "B";
String eid = "X";
String key = "name";
String value = "bananaman";
Vertex v1 = graph.addVertex(id1);
Vertex v2 = graph.addVertex(id2);
Edge e = graph.addEdge(eid, v1, v2, "edge");
e.setProperty(key, value);
Iterable<Element> elements = graph.getGlobals()
.getEdgeKeyIndexWrapper().readElementsFromIndex(key, value);
assertEquals(0, count(elements));
}
@SuppressWarnings("unused")
private static int count(Iterable<?> it) {
int count = 0;
for (Object obj : it) {
count++;
}
return count;
}
}

View File

@@ -24,6 +24,8 @@ import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.cache.ElementCache;
public class ElementCacheTest {
@Test

View File

@@ -0,0 +1,232 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop;
import static org.junit.Assert.*;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
/**
* Tests related to {@link Element}-based property caching.
*/
public class ElementPropertyCachingTest {
private static final int TIMEOUT = 300000;
private static final String NON_CACHED = "noncached";
private static final String CACHED = "cached";
@Test
public void testCachingDisabled() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("cachingDisabled");
assertTrue(cfg.getPropertyCacheTimeout(null) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(NON_CACHED) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(CACHED) <= 0);
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
@Test
public void testSpecificCaching() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("getProperty");
cfg.setPropertyCacheTimeout(CACHED, TIMEOUT);
assertTrue(cfg.getPropertyCacheTimeout(null) <= 0);
assertTrue(cfg.getPropertyCacheTimeout(NON_CACHED) <= 0);
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(CACHED));
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(true, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(CACHED), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
@Test
public void testAllCaching() {
AccumuloGraphConfiguration cfg =
AccumuloGraphTestUtils.generateGraphConfig("setProperty");
cfg.setPropertyCacheTimeout(null, TIMEOUT);
cfg.setPropertyCacheTimeout(CACHED, TIMEOUT);
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(null));
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(NON_CACHED));
assertEquals(TIMEOUT, cfg.getPropertyCacheTimeout(CACHED));
Graph graph = open(cfg);
load(graph);
AccumuloVertex a = (AccumuloVertex) graph.getVertex("A");
AccumuloVertex b = (AccumuloVertex) graph.getVertex("B");
AccumuloVertex c = (AccumuloVertex) graph.getVertex("C");
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(true, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(true, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(true, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(true, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(NON_CACHED), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(CACHED), c.getPropertyCache().keySet());
a.removeProperty(NON_CACHED);
b.removeProperty(NON_CACHED);
c.removeProperty(NON_CACHED);
a.removeProperty(CACHED);
b.removeProperty(CACHED);
c.removeProperty(CACHED);
assertEquals(null, a.getProperty(NON_CACHED));
assertEquals(null, b.getProperty(NON_CACHED));
assertEquals(null, c.getProperty(NON_CACHED));
assertEquals(null, a.getProperty(CACHED));
assertEquals(null, b.getProperty(CACHED));
assertEquals(null, c.getProperty(CACHED));
assertEquals(null, a.getPropertyCache().get(NON_CACHED));
assertEquals(null, b.getPropertyCache().get(NON_CACHED));
assertEquals(null, c.getPropertyCache().get(NON_CACHED));
assertEquals(null, a.getPropertyCache().get(CACHED));
assertEquals(null, b.getPropertyCache().get(CACHED));
assertEquals(null, c.getPropertyCache().get(CACHED));
assertEquals(Sets.newHashSet(), a.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), b.getPropertyCache().keySet());
assertEquals(Sets.newHashSet(), c.getPropertyCache().keySet());
graph.shutdown();
}
private static Graph open(AccumuloGraphConfiguration cfg) {
return GraphFactory.open(cfg);
}
private static void load(Graph graph) {
graph.addVertex("A");
graph.addVertex("B").setProperty(NON_CACHED, true);
graph.addVertex("C").setProperty(CACHED, true);
}
}

View File

@@ -18,6 +18,8 @@ import static org.junit.Assert.*;
import org.junit.Test;
import edu.jhuapl.tinkerpop.cache.PropertyCache;
/**
* Test the {@link PropertyCache} object.
*/