Move getEdges to EdgeTableWrapper

Make getVertices and getEdges cache their results
Remove confusing AccumuloEdge constructors and other methods
Added some debugging method for the backing tables
Bug fixes in unit tests
This commit is contained in:
Michael Lieberman
2015-01-05 16:14:34 -05:00
parent 37b4c8adc9
commit 899ed892fc
8 changed files with 219 additions and 149 deletions

View File

@@ -17,68 +17,42 @@ package edu.jhuapl.tinkerpop;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import com.tinkerpop.blueprints.util.StringFactory;
/**
* TODO
*/
public class AccumuloEdge extends AccumuloElement implements Edge {
String label;
String inId;
String outId;
Vertex inVertex;
Vertex outVertex;
private String label;
private Vertex inVertex;
private Vertex outVertex;
public AccumuloEdge(GlobalInstances globals, String id) {
this(globals, id, null);
}
public AccumuloEdge(GlobalInstances globals, String id, String label) {
this(globals, id, label, (Vertex) null, (Vertex) null);
this(globals, id, null, null, null);
}
public AccumuloEdge(GlobalInstances globals, String id,
String label, Vertex inVertex, Vertex outVertex) {
Vertex inVertex, Vertex outVertex, String label) {
super(globals, id, Edge.class);
this.label = label;
this.inVertex = inVertex;
this.outVertex = outVertex;
}
public AccumuloEdge(GlobalInstances globals, String id, String label, String inVertex, String outVertex) {
super(globals, id, Edge.class);
this.label = label;
this.inId = inVertex;
this.outId = outVertex;
}
@Override
public Vertex getVertex(Direction direction) throws IllegalArgumentException {
switch (direction) {
case IN:
if (inVertex == null) {
if (inId == null) {
inVertex = globals.getGraph().getEdgeVertex(id, direction);
inId = inVertex.getId().toString();
} else {
inVertex = globals.getGraph().getVertex(inId);
}
}
return inVertex;
case OUT:
if (outVertex == null) {
if (outId == null) {
outVertex = globals.getGraph().getEdgeVertex(id, direction);
outId = outVertex.getId().toString();
} else {
outVertex = globals.getGraph().getVertex(outId);
}
}
return outVertex;
case BOTH:
throw ExceptionFactory.bothIsNotSupported();
default:
throw new RuntimeException("Unexpected direction: " + direction);
if (!Direction.IN.equals(direction) && !Direction.OUT.equals(direction)) {
throw new IllegalArgumentException("Invalid direction: "+direction);
}
// The vertex information needs to be loaded.
if (inVertex == null || outVertex == null || label == null) {
System.out.println("Loading information for edge: "+this);
globals.getEdgeWrapper().loadEndpointsAndLabel(this);
}
return Direction.IN.equals(direction) ? inVertex : outVertex;
}
@Override
@@ -95,28 +69,17 @@ public class AccumuloEdge extends AccumuloElement implements Edge {
globals.getGraph().removeEdge(this);
}
public String getInId() {
return inId;
public void setVertices(AccumuloVertex inVertex, AccumuloVertex outVertex) {
this.inVertex = inVertex;
this.outVertex = outVertex;
}
public String getOutId() {
return outId;
}
protected void setInId(String id){
inId = id;
}
protected void setOutId(String id){
outId = id;
}
protected void setLabel(String label){
public void setLabel(String label) {
this.label = label;
}
@Override
public String toString() {
return "[" + getId() + ":" + getVertex(Direction.OUT) + " -> " + getLabel() + " -> " + getVertex(Direction.IN) + "]";
return "[" + getId() + ":" + inVertex + " -> " + label + " -> " + outVertex + "]";
}
}

View File

@@ -46,7 +46,6 @@ import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Features;
@@ -541,8 +540,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
String myID = id.toString();
AccumuloEdge edge = new AccumuloEdge(globals, myID,
label, inVertex, outVertex);
AccumuloEdge edge = new AccumuloEdge(globals, myID, inVertex, outVertex, label);
// TODO we arent suppose to make sure the given edge ID doesn't already
// exist?
@@ -592,26 +590,6 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
return edge;
}
private void preloadProperties(Iterator<Entry<Key, Value>> iter, AccumuloElement e) {
while (iter.hasNext()) {
Entry<Key,Value> entry = iter.next();
Key key = entry.getKey();
String attr = key.getColumnFamily().toString();
if (SLABEL.equals(attr)) {
if (!key.getColumnQualifier().toString().equals(SEXISTS)) {
AccumuloEdge edge = (AccumuloEdge) e;
String[] ids = key.getColumnQualifier().toString().split("_");
edge.setInId(ids[0]);
edge.setOutId(ids[1]);
edge.setLabel(entry.getValue().toString());
}
continue;
}
Object val = AccumuloByteSerializer.deserialize(entry.getValue().get());
e.cacheProperty(attr, val);
}
}
@Override
public void removeEdge(Edge edge) {
if (!config.getIndexableGraphDisabled())
@@ -664,34 +642,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
@Override
public Iterable<Edge> getEdges() {
BatchScanner scan = getElementBatchScanner(Edge.class);
scan.fetchColumnFamily(TLABEL);
if (config.getPreloadedProperties() != null) {
for (String x : config.getPreloadedProperties()) {
scan.fetchColumnFamily(new Text(x));
}
}
return new ScannerIterable<Edge>(scan) {
@Override
public Edge next(PeekingIterator<Entry<Key,Value>> iterator) {
// TODO I dont know if this should work with a batch scanner....
Entry<Key,Value> entry = iterator.next();
AccumuloEdge edge = new AccumuloEdge(globals, entry.getKey().getRow().toString(), AccumuloByteSerializer
.deserialize(entry.getValue().get()).toString());
String rowid = entry.getKey().getRow().toString();
List<Entry<Key,Value>> vals = new ArrayList<Entry<Key,Value>>();
while (iterator.peek() != null && rowid.compareToIgnoreCase(iterator.peek().getKey().getRow().toString()) == 0) {
vals.add(iterator.next());
}
preloadProperties(vals.iterator(), edge);
caches.cache(edge, Edge.class);
return edge;
}
};
return globals.getEdgeWrapper().getEdges();
}
@Override
@@ -743,7 +694,9 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
if (k.getColumnFamily().compareTo(AccumuloGraph.TLABEL) == 0) {
String[] vals = k.getColumnQualifier().toString().split(AccumuloGraph.IDDELIM);
return new AccumuloEdge(globals, k.getRow().toString(), null, vals[0], vals[1]);
return new AccumuloEdge(globals, k.getRow().toString(),
new AccumuloVertex(globals, vals[0]),
new AccumuloVertex(globals, vals[1]), null);
}
return new AccumuloEdge(globals, k.getRow().toString());
}
@@ -889,37 +842,6 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
}
}
/**
* @deprecated Move to appropriate place
* @param edgeId
* @param direction
* @return
*/
Vertex getEdgeVertex(String edgeId, Direction direction) {
Scanner s = getElementScanner(Edge.class);
try {
s.setRange(new Range(edgeId));
s.fetchColumnFamily(TLABEL);
Iterator<Entry<Key,Value>> iter = s.iterator();
if (!iter.hasNext()) {
return null;
}
String id;
String val = iter.next().getKey().getColumnQualifier().toString();
if (direction == Direction.IN) {
id = val.split(IDDELIM)[0];
} else {
id = val.split(IDDELIM)[1];
}
Vertex v = new AccumuloVertex(globals, id);
caches.cache(v, Vertex.class);
return v;
} finally {
s.close();
}
}
private void nullCheckProperty(String key, Object val) {
if (key == null) {
throw ExceptionFactory.propertyKeyCanNotBeNull();

View File

@@ -20,6 +20,9 @@ import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
import com.tinkerpop.blueprints.util.DefaultVertexQuery;
/**
* TODO
*/
public class AccumuloVertex extends AccumuloElement implements Vertex {
public AccumuloVertex(GlobalInstances globals, String id) {

View File

@@ -0,0 +1,70 @@
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.jhuapl.tinkerpop.parser;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloGraph;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
/**
* TODO
*/
public class EdgeParser extends ElementParser<AccumuloEdge> {
public EdgeParser(GlobalInstances globals) {
super(globals);
}
@Override
public AccumuloEdge parse(String id, Iterable<Entry<Key,Value>> entries) {
AccumuloEdge edge = makeEdge(id, entries);
setInMemoryProperties(edge, entries);
return edge;
}
/**
* Make and return an edge object. If the entries
* contain label/endpoint information, set those too.
* @param id
* @param entries
* @return
*/
private AccumuloEdge makeEdge(String id, Iterable<Entry<Key,Value>> entries) {
for (Entry<Key, Value> entry : entries) {
String cf = entry.getKey().getColumnFamily().toString();
if (AccumuloGraph.SLABEL.equals(cf)) {
String cq = entry.getKey().getColumnQualifier().toString();
String[] parts = cq.split(AccumuloGraph.IDDELIM);
String inVertexId = parts[0];
String outVertexId = parts[1];
String label = AccumuloByteSerializer.deserialize(entry.getValue().get());
return new AccumuloEdge(globals, id,
new AccumuloVertex(globals, inVertexId),
new AccumuloVertex(globals, outVertexId), label);
}
}
// This should not happen.
throw new AccumuloGraphException("Unable to parse edge from entries");
}
}

View File

@@ -11,8 +11,12 @@
******************************************************************************/
package edu.jhuapl.tinkerpop.tables;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.GlobalInstances;
@@ -47,4 +51,12 @@ public abstract class BaseTableWrapper {
throw new AccumuloGraphException(e);
}
}
public void dump() {
System.out.println("Dump of table "+tableName+":");
Scanner s = getScanner();
for (Entry<Key, Value> entry : s) {
System.out.println(" "+entry);
}
}
}

View File

@@ -11,11 +11,30 @@
******************************************************************************/
package edu.jhuapl.tinkerpop.tables;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloGraph;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.edge.EdgeMutator;
import edu.jhuapl.tinkerpop.parser.EdgeParser;
/**
@@ -45,4 +64,68 @@ public class EdgeTableWrapper extends ElementTableWrapper {
Mutators.apply(getWriter(), new EdgeMutator.Delete(edge));
globals.checkedFlush();
}
public Iterable<Edge> getEdges() {
Scanner scan = getScanner();
scan.fetchColumnFamily(AccumuloGraph.TLABEL);
if (globals.getConfig().getPreloadedProperties() != null) {
for (String key : globals.getConfig().getPreloadedProperties()) {
scan.fetchColumnFamily(new Text(key));
}
}
final EdgeParser parser = new EdgeParser(globals);
return new ScannerIterable<Edge>(scan) {
@Override
public Edge next(PeekingIterator<Entry<Key, Value>> iterator) {
// TODO could also check local cache before creating a new instance?
String rowId = iterator.peek().getKey().getRow().toString();
List<Entry<Key, Value>> entries =
new ArrayList<Entry<Key, Value>>();
// MDL 05 Jan 2014: Why is this equalsIgnoreCase??
while (iterator.peek() != null && rowId.equalsIgnoreCase(iterator
.peek().getKey().getRow().toString())) {
entries.add(iterator.next());
}
AccumuloEdge edge = parser.parse(rowId, entries);
globals.getCaches().cache(edge, Edge.class);
return edge;
}
};
}
public void loadEndpointsAndLabel(AccumuloEdge edge) {
Scanner s = getScanner();
try {
s.setRange(new Range(edge.getId().toString()));
s.fetchColumnFamily(AccumuloGraph.TLABEL);
Iterator<Entry<Key,Value>> iter = s.iterator();
if (!iter.hasNext()) {
dump();
throw new AccumuloGraphException("Unable to find edge row: "+edge);
}
Entry<Key, Value> entry = iter.next();
String cq = entry.getKey().getColumnQualifier().toString();
String[] ids = cq.split(AccumuloGraph.IDDELIM);
String label = AccumuloByteSerializer.deserialize(entry.getValue().get());
edge.setVertices(new AccumuloVertex(globals, ids[0]),
new AccumuloVertex(globals, ids[1]));
edge.setLabel(label);
} finally {
s.close();
}
}
}

View File

@@ -102,11 +102,13 @@ public class VertexTableWrapper extends ElementTableWrapper {
AccumuloEdge edge;
if (kv.getKey().getColumnFamily().toString().equalsIgnoreCase(AccumuloGraph.SINEDGE)) {
edge = new AccumuloEdge(globals, parts[1], label,
kv.getKey().getRow().toString(), parts[0]);
edge = new AccumuloEdge(globals, parts[1],
new AccumuloVertex(globals, kv.getKey().getRow().toString()),
new AccumuloVertex(globals, parts[0]), label);
} else {
edge = new AccumuloEdge(globals, parts[1], label,
parts[0], kv.getKey().getRow().toString());
edge = new AccumuloEdge(globals, parts[1],
new AccumuloVertex(globals, parts[0]),
new AccumuloVertex(globals, kv.getKey().getRow().toString()), label);
}
globals.getCaches().cache(edge, Edge.class);
@@ -176,7 +178,10 @@ public class VertexTableWrapper extends ElementTableWrapper {
entries.add(iterator.next());
}
return parser.parse(rowId, entries);
AccumuloVertex vertex = parser.parse(rowId, entries);
globals.getCaches().cache(vertex, Vertex.class);
return vertex;
}
};
}

View File

@@ -14,8 +14,13 @@
*/
package edu.jhuapl.tinkerpop;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import com.tinkerpop.blueprints.EdgeTestSuite;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
@@ -114,7 +119,14 @@ public class AccumuloGraphTest extends GraphTest {
public void dropGraph(final String graphDirectoryName) {
if (graphDirectoryName != null) {
((AccumuloGraph) generateGraph(graphDirectoryName)).clear();
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils.generateGraphConfig(graphDirectoryName);
try {
for (String table : cfg.getConnector().tableOperations().list()) {
cfg.getConnector().tableOperations().delete(table);
}
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
}