More source and typo cleanup

This commit is contained in:
Michael Lieberman
2014-12-23 11:24:20 -05:00
parent 1b58eb42e8
commit 2a74321b5a
6 changed files with 84 additions and 156 deletions

View File

@@ -53,7 +53,7 @@ public final class AccumuloByteSerializer {
}
};
public static <T> T desserialize(byte[] target) {
public static <T> T deserialize(byte[] target) {
if (target[0] == NULL) {
return null;
}

View File

@@ -64,123 +64,61 @@ import com.tinkerpop.blueprints.util.StringFactory;
/**
*
* This is an implementation of Tinkerpop's Graph API backed by Apache Accumulo.
* The implementation currently supports the {@link IndexableGraph}
* and {@link KeyIndexableGraph} interfaces.
* This is an implementation of TinkerPop's graph API
* backed by Apache Accumulo. In addition to the basic
* Graph interface, the implementation
* supports {@link IndexableGraph} and {@link KeyIndexableGraph}.
*
* It currently relies on 6 to N tables with the format.
* Every index created by the indexable interface gets its own table.
* <p/>Tables have the following formats.
*
* VertexTable
* <table border=1>
* <thead>
* <tr>
* <th>ROWID</th>
* <th>COLFAM</th>
* <th>COLQAL</th>
* <th>VALUE</th>
* </tr>
* </thead> <tbody>
* <tr>
* <td>VertexID</td>
* <td>LABEL</td>
* <td>EXISTS</td>
* <td>[empty]</td>
* </tr>
* <tr>
* <td>VertexID</td>
* <td>INEDGE</td>
* <td>InVertedID_EdgeID</td>
* <td>EdgeLabel</td>
* </tr>
* <tr>
* <td>VertexID</td>
* <td>OUTEDGE</td>
* <td>OutVertedID_EdgeID</td>
* <td>EdgeLabel</td>
* </tr>
* <tr>
* <td>VertexID</td>
* <td>PropertyKey</td>
* <td>[empty]</td>
* <td>PropertyValue</td>
* </tr>
* </tbody>
* <p/>
* <table border="1">
* <caption>Vertex table</caption>
* <thead>
* <tr><th>ROWID</th><th>COLFAM</th><th>COLQUAL</th><th>VALUE</th></tr>
* </thead>
* <tbody>
* <tr><td>VertexID</td><td>LABEL</td><td>EXISTS</td><td>[empty]</td></tr>
* <tr><td>VertexID</td><td>INEDGE</td><td>InVertexID_EdgeID</td><td>EdgeLabel</td></tr>
* <tr><td>VertexID</td><td>OUTEDGE</td><td>OutVertexID_EdgeID</td><td>EdgeLabel</td></tr>
* <tr><td>VertexID</td><td>PropertyKey</td><td>[empty]</td><td>PropertyValue</td></tr>
* </tbody>
* </table>
*
* EdgeTable
* <table border=1>
* <thead>
* <tr>
* <th>ROWID</th>
* <th>COLFAM</th>
* <th>COLQAL</th>
* <th>VALUE</th>
* </tr>
* </thead> <tbody>
* <tr>
* <td>EdgeID</td>
* <td>LABEL</td>
* <td>[empty]</td>
* <td>Encoded LabelValue</td>
* </tr>
* <tr>
* <td>EdgeID</td>
* <td>INEDGE</td>
* <td>InVertedID</td>
* <td>[empty]</td>
* </tr>
* <tr>
* <td>EdgeID</td>
* <td>OUTEDGE</td>
* <td>OutVertedID</td>
* <td>[empty]</td>
* </tr>
* <tr>
* <td>EdgeID</td>
* <td>PropertyKey</td>
* <td>[empty]</td>
* <td>Encoded Value</td>
* </tr>
* </tbody>
* <p/>
* <table border="1">
* <caption>Edge table</caption>
* <thead>
* <tr> <th>ROWID</th><th>COLFAM</th><th>COLQUAL</th><th>VALUE</th></tr>
* </thead>
* <tbody>
* <tr><td>EdgeID</td><td>LABEL</td><td>[empty]</td><td>Encoded LabelValue</td></tr>
* <tr><td>EdgeID</td><td>INEDGE</td><td>InVertexID</td><td>[empty]</td></tr>
* <tr><td>EdgeID</td><td>OUTEDGE</td><td>OutVertexID</td><td>[empty]</td></tr>
* <tr><td>EdgeID</td><td>PropertyKey</td><td>[empty]</td><td>Encoded Value</td></tr>
* </tbody>
* </table>
*
* VertexIndexTable/EdgeIndexTable
* <table border=1>
* <thead>
* <tr>
* <th>ROWID</th>
* <th>COLFAM</th>
* <th>COLQAL</th>
* <th>VALUE</th>
* </tr>
* </thead> <tbody>
* <tr>
* <td>Encoded PropertyValue</td>
* <td>PropertyKey</td>
* <td>ElementID</td>
* <td>[empty]</td>
* </tr>
* </tbody>
* <p/>
* <table border="1">
* <caption>Vertex / edge index tables (each index gets its own table)</caption>
* <thead>
* <tr><th>ROWID</th><th>COLFAM</th><th>COLQUAL</th><th>VALUE</th></tr>
* </thead>
* <tbody>
* <tr><td>Encoded PropertyValue</td><td>PropertyKey</td><td>ElementID</td><td>[empty]</td></tr>
* </tbody>
* </table>
*
* MetadataTable/KeyMetadataTable
* <table border=1>
* <thead>
* <tr>
* <th>ROWID</th>
* <th>COLFAM</th>
* <th>COLQAL</th>
* <th>VALUE</th>
* </tr>
* </thead> <tbody>
* <tr>
* <td>IndexName</td>
* <td>IndexClassType</td>
* <td>[empty]</td>
* <td>[empty]</td>
* </tr>
* </tbody>
* <p/>
* <table border="1">
* <caption>Metadata/key metadata tables</caption>
* <thead>
* <tr><th>ROWID</th><th>COLFAM</th><th>COLQUAL</th><th>VALUE</th></tr>
* </thead>
* <tbody>
* <tr><td>IndexName</td><td>IndexClassType</td><td>[empty]</td><td>[empty]</td></tr>
* </tbody>
* </table>
*/
public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
@@ -651,7 +589,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
v = (v == null ? new AccumuloVertex(AccumuloGraph.this, key.getColumnQualifier().toString()) : v);
int timeout = config.getPropertyCacheTimeout(key.getColumnFamily().toString());
if (timeout != -1) {
v.cacheProperty(key.getColumnFamily().toString(), AccumuloByteSerializer.desserialize(key.getRow().getBytes()), timeout);
v.cacheProperty(key.getColumnFamily().toString(), AccumuloByteSerializer.deserialize(key.getRow().getBytes()), timeout);
}
if (vertexCache != null) {
@@ -685,7 +623,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
v = (v == null ? new AccumuloVertex(AccumuloGraph.this, kv.getKey().getRow().toString()) : v);
int timeout = config.getPropertyCacheTimeout(kv.getKey().getColumnFamily().toString());
if (timeout != -1) {
v.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.desserialize(kv.getValue().get()), timeout);
v.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.deserialize(kv.getValue().get()), timeout);
}
if (vertexCache != null) {
@@ -812,7 +750,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
}
continue;
}
Object val = AccumuloByteSerializer.desserialize(entry.getValue().get());
Object val = AccumuloByteSerializer.deserialize(entry.getValue().get());
e.cacheProperty(attr, val, timeout);
}
@@ -896,7 +834,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
// TODO I dont know if this should work with a batch scanner....
Entry<Key,Value> entry = iterator.next();
AccumuloEdge edge = new AccumuloEdge(AccumuloGraph.this, entry.getKey().getRow().toString(), AccumuloByteSerializer
.desserialize(entry.getValue().get()).toString());
.deserialize(entry.getValue().get()).toString());
String rowid = entry.getKey().getRow().toString();
List<Entry<Key,Value>> vals = new ArrayList<Entry<Key,Value>>();
@@ -939,7 +877,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
int timeout = config.getPropertyCacheTimeout(kv.getKey().getColumnFamily().toString());
if (timeout != -1) {
e.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.desserialize(kv.getKey().getRow().getBytes()), timeout);
e.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.deserialize(kv.getKey().getRow().getBytes()), timeout);
}
if (edgeCache != null) {
@@ -1070,7 +1008,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
T toRet = null;
Iterator<Entry<Key,Value>> iter = s.iterator();
if (iter.hasNext()) {
toRet = AccumuloByteSerializer.desserialize(iter.next().getValue().get());
toRet = AccumuloByteSerializer.deserialize(iter.next().getValue().get());
}
s.close();
return new Pair<Integer,T>(config.getPropertyCacheTimeout(key), toRet);
@@ -1100,7 +1038,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
// Integer timeout = config.getPropertyCacheTimeoutMillis(); // Change this
while (iter.hasNext()) {
Entry<Key,Value> entry = iter.next();
Object val = AccumuloByteSerializer.desserialize(entry.getValue().get());
Object val = AccumuloByteSerializer.deserialize(entry.getValue().get());
element
.cacheProperty(entry.getKey().getColumnFamily().toString(), val, config.getPropertyCacheTimeout(entry.getKey().getColumnFamily().toString()));
}

View File

@@ -14,17 +14,13 @@
*/
package edu.jhuapl.tinkerpop;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -39,13 +35,13 @@ import com.tinkerpop.blueprints.Index;
public class AccumuloIndex<T extends Element> implements Index<T> {
Class indexedType;
Class<T> indexedType;
AccumuloGraph parent;
String indexName;
String tableName;
public AccumuloIndex(Class t, AccumuloGraph parent, String indexName) {
indexedType = t;
public AccumuloIndex(Class<T> t, AccumuloGraph parent, String indexName) {
this.indexedType = t;
this.parent = parent;
this.indexName = indexName;
tableName = parent.config.getGraphName() + "_index_" + indexName;// + "_" +
@@ -56,7 +52,7 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
parent.config.getConnector().tableOperations().create(tableName);
}
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException(e);
}
}
@@ -132,9 +128,9 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
AccumuloGraph parent;
ScannerBase scan;
boolean isClosed;
Class indexedType;
Class<T> indexedType;
IndexIterable(AccumuloGraph parent, ScannerBase scan, Class t) {
IndexIterable(AccumuloGraph parent, ScannerBase scan, Class<T> t) {
this.scan = scan;
this.parent = parent;
isClosed = false;
@@ -143,30 +139,26 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
public Iterator<T> iterator() {
if (!isClosed) {
if(indexedType.equals(Edge.class)){
return new ScannerIterable<T>(parent, scan) {
return new ScannerIterable<T>(parent, scan) {
@Override
public T next(PeekingIterator<Entry<Key,Value>> iterator) {
// TODO better use of information readily
// available...
return (T) new AccumuloEdge(parent, iterator.next().getKey().getColumnQualifier().toString());
}
}.iterator();
}else{
return new ScannerIterable<T>(parent, scan) {
@Override
public T next(PeekingIterator<Entry<Key,Value>> iterator) {
// TODO better use of information readily
// available...
return (T) new AccumuloVertex(parent, iterator.next().getKey().getColumnQualifier().toString());
}
}.iterator();
}
@Override
public T next(PeekingIterator<Entry<Key, Value>> iterator) {
String id = iterator.next()
.getKey().getColumnQualifier().toString();
// TODO better use of information readily
// available...
if (indexedType.equals(Edge.class)) {
return (T) new AccumuloEdge(parent, id);
}
else {
return (T) new AccumuloVertex(parent, id);
}
}
}.iterator();
}
else {
return null;
}
return null;
}
public void close() {
@@ -182,5 +174,4 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
public Class<T> getIndexClass() {
return indexedType;
}
}

View File

@@ -90,11 +90,11 @@ public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
String[] ids = currentKey.getColumnQualifier().toString().split(parent.IDDELIM);
edge.setSourceId(ids[1]);
edge.setDestId(ids[0]);
edge.setLabel(AccumuloByteSerializer.desserialize(entry.getValue().get()).toString());
edge.setLabel(AccumuloByteSerializer.deserialize(entry.getValue().get()).toString());
break;
default:
String propertyKey = currentKey.getColumnFamily().toString();
Object propertyValue = AccumuloByteSerializer.desserialize(entry.getValue().get());
Object propertyValue = AccumuloByteSerializer.deserialize(entry.getValue().get());
edge.prepareProperty(propertyKey, propertyValue);
}
}

View File

@@ -17,7 +17,6 @@ package edu.jhuapl.tinkerpop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -108,7 +107,7 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
String key = in.readUTF();
byte[] data = new byte[in.readInt()];
in.readFully(data);
Object val = AccumuloByteSerializer.desserialize(data);
Object val = AccumuloByteSerializer.deserialize(data);
properties.put(key, val);
}
@@ -117,7 +116,7 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
String key = in.readUTF();
byte[] data = new byte[in.readInt()];
in.readFully(data);
Object val = AccumuloByteSerializer.desserialize(data);
Object val = AccumuloByteSerializer.deserialize(data);
newProperties.put(key, val);
}

View File

@@ -101,7 +101,7 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
break;
default:
String propertyKey = currentKey.getColumnFamily().toString();
Object propertyValue = AccumuloByteSerializer.desserialize(entry.getValue().get());
Object propertyValue = AccumuloByteSerializer.deserialize(entry.getValue().get());
vertex.prepareProperty(propertyKey, propertyValue);
}
}