Move get functionality out of AccumuloIndex to IndexTableWrapper

This commit is contained in:
Michael Lieberman
2015-01-21 13:23:23 -05:00
parent 2dd9e66cf2
commit e59c590eae
2 changed files with 75 additions and 67 deletions

View File

@@ -14,29 +14,13 @@
*/
package edu.jhuapl.tinkerpop;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.CloseableIterable;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Index;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.parser.EdgeIndexParser;
import edu.jhuapl.tinkerpop.parser.ElementIndexParser;
import edu.jhuapl.tinkerpop.parser.VertexIndexParser;
import edu.jhuapl.tinkerpop.tables.NamedIndexTableWrapper;
/**
@@ -96,11 +80,7 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
@Override
public CloseableIterable<T> get(String key, Object value) {
Scanner scan = getScanner();
byte[] id = AccumuloByteSerializer.serialize(value);
scan.setRange(Range.exact(new Text(id)));
scan.fetchColumnFamily(new Text(key));
return new IndexIterable(globals.getGraph(), scan, indexedType);
return indexWrapper.readElementsFromIndex(key, value);
}
@Override
@@ -129,48 +109,4 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
private BatchWriter getWriter() {
return globals.getGraph().getWriter(getTableName());
}
private Scanner getScanner() {
return globals.getGraph().getScanner(getTableName());
}
private class IndexIterable implements CloseableIterable<T> {
private ScannerBase scan;
private Class<T> indexedType;
private IndexIterable(AccumuloGraph parent, ScannerBase scan,
Class<T> indexedType) {
this.scan = scan;
this.indexedType = indexedType;
}
@Override
public Iterator<T> iterator() {
final ElementIndexParser<? extends AccumuloElement> parser =
Vertex.class.equals(indexedType) ? new VertexIndexParser(globals) :
new EdgeIndexParser(globals);
if (scan != null) {
return new ScannerIterable<T>(scan) {
@SuppressWarnings("unchecked")
@Override
public T next(PeekingIterator<Entry<Key, Value>> iterator) {
return (T) parser.parse(Arrays.asList(iterator.next()));
}
}.iterator();
}
else {
return null;
}
}
@Override
public void close() {
if (scan != null) {
scan.close();
scan = null;
}
}
}
}

View File

@@ -14,13 +14,33 @@
*/
package edu.jhuapl.tinkerpop.tables;
import org.apache.accumulo.core.client.BatchWriter;
import com.tinkerpop.blueprints.Element;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.CloseableIterable;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.AccumuloGraphUtils;
import edu.jhuapl.tinkerpop.GlobalInstances;
import edu.jhuapl.tinkerpop.ScannerIterable;
import edu.jhuapl.tinkerpop.mutator.Mutators;
import edu.jhuapl.tinkerpop.mutator.index.IndexValueMutator;
import edu.jhuapl.tinkerpop.parser.EdgeIndexParser;
import edu.jhuapl.tinkerpop.parser.ElementIndexParser;
import edu.jhuapl.tinkerpop.parser.VertexIndexParser;
/**
* Wrapper around index tables.
@@ -74,4 +94,56 @@ public abstract class IndexTableWrapper extends BaseTableWrapper {
globals.checkedFlush();
}
}
/**
* Get elements with the key/value pair.
* @param key
* @param value
* @return
*/
@SuppressWarnings("unchecked")
public <T extends Element> CloseableIterable<T> readElementsFromIndex(String key, Object value) {
Scanner scan = getScanner();
byte[] id = AccumuloByteSerializer.serialize(value);
scan.setRange(Range.exact(new Text(id)));
scan.fetchColumnFamily(new Text(key));
return new IndexIterable(scan);
}
private class IndexIterable<T extends Element> implements CloseableIterable<T> {
private ScannerBase scan;
private IndexIterable(ScannerBase scan) {
this.scan = scan;
}
@Override
public Iterator<T> iterator() {
final ElementIndexParser<? extends AccumuloElement> parser =
Vertex.class.equals(elementType) ? new VertexIndexParser(globals) :
new EdgeIndexParser(globals);
if (scan != null) {
return new ScannerIterable<T>(scan) {
@SuppressWarnings("unchecked")
@Override
public T next(PeekingIterator<Entry<Key, Value>> iterator) {
return (T) parser.parse(Arrays.asList(iterator.next()));
}
}.iterator();
}
else {
return null;
}
}
@Override
public void close() {
if (scan != null) {
scan.close();
scan = null;
}
}
}
}