Move key/value getVertices into VertexTableWrapper

This commit is contained in:
Michael Lieberman
2015-01-19 14:23:20 -05:00
parent 98a24bd62f
commit 6196dc7d8f
4 changed files with 95 additions and 46 deletions

View File

@@ -26,21 +26,21 @@ import javax.xml.namespace.QName;
public final class AccumuloByteSerializer {
static final int NULL = 'n';
public static final int NULL = 'n';
static final int BYTE = 'b';
static final int SHORT = 's';
static final int CHARACTER = 'c';
static final int INTEGER = 'i';
static final int LONG = 'l';
static final int FLOAT = 'f';
static final int DOUBLE = 'd';
static final int BOOLEAN = 'o';
static final int DATE = 't';
static final int ENUM = 'e';
static final int STRING = 'a';
static final int SERIALIZABLE = 'x';
static final int QNAME = 'q';
public static final int BYTE = 'b';
public static final int SHORT = 's';
public static final int CHARACTER = 'c';
public static final int INTEGER = 'i';
public static final int LONG = 'l';
public static final int FLOAT = 'f';
public static final int DOUBLE = 'd';
public static final int BOOLEAN = 'o';
public static final int DATE = 't';
public static final int ENUM = 'e';
public static final int STRING = 'a';
public static final int SERIALIZABLE = 'x';
public static final int QNAME = 'q';
private AccumuloByteSerializer() {
@@ -53,6 +53,7 @@ public final class AccumuloByteSerializer {
}
};
@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] target) {
if (target[0] == NULL) {
return null;

View File

@@ -492,41 +492,10 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
caches.cache(v, Vertex.class);
return v;
}
};
} else {
byte[] val = AccumuloByteSerializer.serialize(value);
if (val[0] != AccumuloByteSerializer.SERIALIZABLE) {
BatchScanner scan = getElementBatchScanner(Vertex.class);
scan.fetchColumnFamily(new Text(key));
IteratorSetting is = new IteratorSetting(10, "filter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null, null, Pattern.quote(new String(val)), false);
scan.addScanIterator(is);
return new ScannerIterable<Vertex>(scan) {
@Override
public Vertex next(PeekingIterator<Entry<Key,Value>> iterator) {
Entry<Key,Value> kv = iterator.next();
Vertex v = caches.retrieve(kv.getKey().getRow().toString(), Vertex.class);
v = (v == null ? new AccumuloVertex(globals, kv.getKey().getRow().toString()) : v);
((AccumuloElement) v).cacheProperty(kv.getKey().getColumnFamily().toString(),
AccumuloByteSerializer.deserialize(kv.getValue().get()));
caches.cache(v, Vertex.class);
return v;
}
};
} else {
// TODO
throw new UnsupportedOperationException("Filtering on binary data not currently supported.");
}
return vertexWrapper.getVertices(key, value);
}
}

View File

@@ -14,11 +14,14 @@
*/
package edu.jhuapl.tinkerpop.tables;
import java.util.Collections;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import edu.jhuapl.tinkerpop.AccumuloGraphException;
@@ -47,6 +50,16 @@ public abstract class BaseTableWrapper {
}
}
protected BatchScanner getBatchScanner() {
try {
BatchScanner scanner = globals.getConfig().getConnector().createBatchScanner(tableName,
globals.getConfig().getAuthorizations(), globals.getConfig().getQueryThreads());
scanner.setRanges(Collections.singletonList(new Range()));
return scanner;
} catch (Exception e) {
throw new AccumuloGraphException(e);
}
}
protected BatchWriter getWriter() {
try {
return globals.getMtbw().getBatchWriter(tableName);

View File

@@ -17,19 +17,27 @@ package edu.jhuapl.tinkerpop.tables;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.ExceptionFactory;
import com.tinkerpop.blueprints.util.StringFactory;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloEdge;
import edu.jhuapl.tinkerpop.AccumuloElement;
import edu.jhuapl.tinkerpop.AccumuloGraph;
import edu.jhuapl.tinkerpop.AccumuloVertex;
import edu.jhuapl.tinkerpop.GlobalInstances;
@@ -188,4 +196,62 @@ public class VertexTableWrapper extends ElementTableWrapper {
}
};
}
public Iterable<Vertex> getVertices(String key, Object value) {
validateProperty(key, value);
byte[] val = AccumuloByteSerializer.serialize(value);
if (val[0] != AccumuloByteSerializer.SERIALIZABLE) {
BatchScanner scan = getBatchScanner();
scan.fetchColumnFamily(new Text(key));
IteratorSetting is = new IteratorSetting(10, "filter", RegExFilter.class);
RegExFilter.setRegexs(is, null, null, null, Pattern.quote(new String(val)), false);
scan.addScanIterator(is);
return new ScannerIterable<Vertex>(scan) {
@Override
public Vertex next(PeekingIterator<Entry<Key,Value>> iterator) {
Entry<Key, Value> kv = iterator.next();
String key = kv.getKey().getColumnFamily().toString();
Object value = AccumuloByteSerializer.deserialize(kv.getValue().get());
Vertex v = globals.getCaches().retrieve(kv.getKey().getRow().toString(), Vertex.class);
if (v == null) {
v = new AccumuloVertex(globals, kv.getKey().getRow().toString());
}
((AccumuloElement) v).setPropertyInMemory(key, value);
globals.getCaches().cache(v, Vertex.class);
return v;
}
};
} else {
// TODO
throw new UnsupportedOperationException("Filtering on binary data not currently supported.");
}
}
private void validateProperty(String key, Object val) {
nullCheckProperty(key, val);
if (key.equals(StringFactory.ID)) {
throw ExceptionFactory.propertyKeyIdIsReserved();
} else if (key.equals(StringFactory.LABEL)) {
throw ExceptionFactory.propertyKeyLabelIsReservedForEdges();
} else if (val == null) {
throw ExceptionFactory.propertyValueCanNotBeNull();
}
}
private void nullCheckProperty(String key, Object val) {
if (key == null) {
throw ExceptionFactory.propertyKeyCanNotBeNull();
} else if (val == null) {
throw ExceptionFactory.propertyValueCanNotBeNull();
} else if (key.trim().equals(StringFactory.EMPTY_STRING)) {
throw ExceptionFactory.propertyKeyCanNotBeEmpty();
}
}
}