Move index-based getEdges to EdgeIndexTableWrapper

This commit is contained in:
Michael Lieberman
2015-01-20 16:36:07 -05:00
parent 16b7284f08
commit 2544ac0c30
2 changed files with 57 additions and 23 deletions

View File

@@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.io.Text;
@@ -565,27 +564,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
}
if (config.getAutoIndex() || getIndexedKeys(Edge.class).contains(key)) {
// Use the index
Scanner s = getEdgeIndexScanner();
byte[] val = AccumuloByteSerializer.serialize(value);
Text tVal = new Text(val);
s.setRange(new Range(tVal, tVal));
s.fetchColumnFamily(new Text(key));
return new ScannerIterable<Edge>(s) {
@Override
public Edge next(PeekingIterator<Entry<Key,Value>> iterator) {
Entry<Key,Value> kv = iterator.next();
Edge e = globals.getCaches().retrieve(kv.getKey().getColumnQualifier().toString(), Edge.class);
e = (e == null ? new AccumuloEdge(globals, kv.getKey().getColumnQualifier().toString()) : e);
((AccumuloElement) e).cacheProperty(kv.getKey().getColumnFamily().toString(),
AccumuloByteSerializer.deserialize(kv.getKey().getRow().getBytes()));
globals.getCaches().cache(e, Edge.class);
return e;
}
};
return globals.getEdgeIndexWrapper().getEdges(key, value);
} else {
return globals.getEdgeWrapper().getEdges(key, value);
}

View File

@@ -14,9 +14,22 @@
*/
package edu.jhuapl.tinkerpop.tables;
import com.tinkerpop.blueprints.Edge;
import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Edge;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.parser.EdgeIndexParser;
/**
* Wrapper around {@link Edge} index table.
@@ -27,4 +40,46 @@ public class EdgeIndexTableWrapper extends IndexTableWrapper {
super(globals, Edge.class, globals.getConfig()
.getEdgeKeyIndexTableName());
}
/**
* Retrieve edges from the index table based
* on the given key/value.
* @param key
* @param value
* @return
*/
public Iterable<Edge> getEdges(String key, Object value) {
Scanner s = getScanner();
Text row = new Text(AccumuloByteSerializer.serialize(value));
s.setRange(Range.exact(row));
s.fetchColumnFamily(new Text(key));
final EdgeIndexParser parser = new EdgeIndexParser(globals);
return new ScannerIterable<Edge>(s) {
@Override
public Edge next(PeekingIterator<Entry<Key, Value>> iterator) {
Entry<Key, Value> entry = iterator.next();
AccumuloEdge e = parser.parse(Arrays.asList(entry));
// Check if we have it cached already, in which
// case use the cached version.
AccumuloEdge cached = (AccumuloEdge) globals.getCaches()
.retrieve(e.getId(), Edge.class);
if (cached != null) {
for (String key : e.getPropertyKeysInMemory()) {
cached.setPropertyInMemory(key, e.getPropertyInMemory(key));
}
return cached;
}
// We don't have it, so cache the new one and return it.
globals.getCaches().cache(e, Edge.class);
return e;
}
};
}
}