mirror of
https://github.com/JHUAPL/AccumuloGraph.git
synced 2026-01-09 12:47:56 -05:00
Merge pull request #81 from JHUAPL/configuration-cleanup
Configuration cleanup
This commit is contained in:
@@ -223,10 +223,12 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
config.validate();
|
||||
this.config = config;
|
||||
|
||||
if (config.useLruCache()) {
|
||||
vertexCache = new LruElementCache<Vertex>(config.getLruMaxCapacity(), config.getVertexCacheTimeoutMillis());
|
||||
if (config.getLruCacheEnabled()) {
|
||||
vertexCache = new LruElementCache<Vertex>(config.getLruMaxCapacity(),
|
||||
config.getVertexCacheTimeout());
|
||||
|
||||
edgeCache = new LruElementCache<Edge>(config.getLruMaxCapacity(), config.getEdgeCacheTimeoutMillis());
|
||||
edgeCache = new LruElementCache<Edge>(config.getLruMaxCapacity(),
|
||||
config.getEdgeCacheTimeout());
|
||||
}
|
||||
|
||||
AccumuloGraphUtils.handleCreateAndClear(config);
|
||||
@@ -395,7 +397,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
|
||||
Vertex vert = null;
|
||||
if (!config.skipExistenceChecks()) {
|
||||
if (!config.getSkipExistenceChecks()) {
|
||||
vert = getVertex(myID);
|
||||
if (vert != null) {
|
||||
ExceptionFactory.vertexWithIdAlreadyExists(myID);
|
||||
@@ -444,7 +446,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
|
||||
Scanner scan = null;
|
||||
try {
|
||||
if (!config.skipExistenceChecks()) {
|
||||
if (!config.getSkipExistenceChecks()) {
|
||||
// in addition to just an "existence" check, we will also load
|
||||
// any "preloaded" properties now, which saves us a round-trip
|
||||
// to Accumulo later...
|
||||
@@ -490,7 +492,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
if (vertexCache != null) {
|
||||
vertexCache.remove(vertex.getId());
|
||||
}
|
||||
if (!config.isIndexableGraphDisabled())
|
||||
if (!config.getIndexableGraphDisabled())
|
||||
clearIndex(vertex.getId());
|
||||
|
||||
Scanner scan = getElementScanner(Vertex.class);
|
||||
@@ -621,7 +623,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
|
||||
public Iterable<Vertex> getVertices(String key, Object value) {
|
||||
checkProperty(key, value);
|
||||
if (config.isAutoIndex() || getIndexedKeys(Vertex.class).contains(key)) {
|
||||
if (config.getAutoIndex() || getIndexedKeys(Vertex.class).contains(key)) {
|
||||
// Use the index
|
||||
Scanner s = getVertexIndexScanner();
|
||||
byte[] val = AccumuloByteSerializer.serialize(value);
|
||||
@@ -641,7 +643,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
|
||||
v = (v == null ? new AccumuloVertex(AccumuloGraph.this, key.getColumnQualifier().toString()) : v);
|
||||
int timeout = config.getPropertyCacheTimeoutMillis(key.getColumnFamily().toString());
|
||||
int timeout = config.getPropertyCacheTimeout(key.getColumnFamily().toString());
|
||||
if (timeout != -1) {
|
||||
v.cacheProperty(key.getColumnFamily().toString(), AccumuloByteSerializer.desserialize(key.getRow().getBytes()), timeout);
|
||||
}
|
||||
@@ -675,7 +677,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
|
||||
v = (v == null ? new AccumuloVertex(AccumuloGraph.this, kv.getKey().getRow().toString()) : v);
|
||||
int timeout = config.getPropertyCacheTimeoutMillis(kv.getKey().getColumnFamily().toString());
|
||||
int timeout = config.getPropertyCacheTimeout(kv.getKey().getColumnFamily().toString());
|
||||
if (timeout != -1) {
|
||||
v.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.desserialize(kv.getValue().get()), timeout);
|
||||
}
|
||||
@@ -755,7 +757,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
}
|
||||
Scanner s;
|
||||
if (!config.skipExistenceChecks()) {
|
||||
if (!config.getSkipExistenceChecks()) {
|
||||
s = getElementScanner(Edge.class);
|
||||
s.setRange(new Range(myID, myID));
|
||||
s.fetchColumnFamily(TLABEL);
|
||||
@@ -791,7 +793,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
Entry<Key,Value> entry = iter.next();
|
||||
Key key = entry.getKey();
|
||||
String attr = key.getColumnFamily().toString();
|
||||
Integer timeout = config.getPropertyCacheTimeoutMillis(attr);
|
||||
Integer timeout = config.getPropertyCacheTimeout(attr);
|
||||
if (SLABEL.equals(attr)) {
|
||||
if (!key.getColumnQualifier().toString().equals(SEXISTS)) {
|
||||
AccumuloEdge edge = (AccumuloEdge) e;
|
||||
@@ -809,7 +811,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
|
||||
public void removeEdge(Edge edge) {
|
||||
if (!config.isIndexableGraphDisabled())
|
||||
if (!config.getIndexableGraphDisabled())
|
||||
clearIndex(edge.getId());
|
||||
|
||||
if (edgeCache != null) {
|
||||
@@ -906,7 +908,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
key = SLABEL;
|
||||
}
|
||||
|
||||
if (config.isAutoIndex() || getIndexedKeys(Edge.class).contains(key)) {
|
||||
if (config.getAutoIndex() || getIndexedKeys(Edge.class).contains(key)) {
|
||||
// Use the index
|
||||
Scanner s = getEdgeIndexScanner();
|
||||
byte[] val = AccumuloByteSerializer.serialize(value);
|
||||
@@ -924,7 +926,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
e = (e == null ? new AccumuloEdge(AccumuloGraph.this, kv.getKey().getColumnQualifier().toString()) : e);
|
||||
|
||||
int timeout = config.getPropertyCacheTimeoutMillis(kv.getKey().getColumnFamily().toString());
|
||||
int timeout = config.getPropertyCacheTimeout(kv.getKey().getColumnFamily().toString());
|
||||
if (timeout != -1) {
|
||||
e.cacheProperty(kv.getKey().getColumnFamily().toString(), AccumuloByteSerializer.desserialize(kv.getKey().getRow().getBytes()), timeout);
|
||||
}
|
||||
@@ -1033,7 +1035,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
}
|
||||
|
||||
private void checkedFlush() {
|
||||
if (config.isAutoFlush()) {
|
||||
if (config.getAutoFlush()) {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
@@ -1058,7 +1060,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
toRet = AccumuloByteSerializer.desserialize(iter.next().getValue().get());
|
||||
}
|
||||
s.close();
|
||||
return new Pair<Integer,T>(config.getPropertyCacheTimeoutMillis(key), toRet);
|
||||
return new Pair<Integer,T>(config.getPropertyCacheTimeout(key), toRet);
|
||||
}
|
||||
|
||||
void preloadProperties(AccumuloElement element, Class<? extends Element> type) {
|
||||
@@ -1087,7 +1089,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
Entry<Key,Value> entry = iter.next();
|
||||
Object val = AccumuloByteSerializer.desserialize(entry.getValue().get());
|
||||
element
|
||||
.cacheProperty(entry.getKey().getColumnFamily().toString(), val, config.getPropertyCacheTimeoutMillis(entry.getKey().getColumnFamily().toString()));
|
||||
.cacheProperty(entry.getKey().getColumnFamily().toString(), val, config.getPropertyCacheTimeout(entry.getKey().getColumnFamily().toString()));
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
@@ -1125,7 +1127,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
byte[] newByteVal = AccumuloByteSerializer.serialize(val);
|
||||
Mutation m = null;
|
||||
|
||||
if (config.isAutoIndex() || getIndexedKeys(type).contains(key)) {
|
||||
if (config.getAutoIndex() || getIndexedKeys(type).contains(key)) {
|
||||
BatchWriter bw = getIndexBatchWriter(type);
|
||||
Object old = getProperty(type, id, key).getSecond();
|
||||
if (old != null) {
|
||||
@@ -1148,7 +1150,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
} catch (MutationsRejectedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return config.getPropertyCacheTimeoutMillis(key);
|
||||
return config.getPropertyCacheTimeout(key);
|
||||
}
|
||||
|
||||
private BatchWriter getBatchWriter(Class<? extends Element> type) {
|
||||
@@ -1327,7 +1329,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
if (indexClass == null) {
|
||||
throw ExceptionFactory.classForElementCannotBeNull();
|
||||
}
|
||||
if (config.isIndexableGraphDisabled())
|
||||
if (config.getIndexableGraphDisabled())
|
||||
throw new UnsupportedOperationException("IndexableGraph is disabled via the configuration");
|
||||
|
||||
Scanner s = this.getMetadataScanner();
|
||||
@@ -1354,7 +1356,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
if (indexClass == null) {
|
||||
throw ExceptionFactory.classForElementCannotBeNull();
|
||||
}
|
||||
if (config.isIndexableGraphDisabled())
|
||||
if (config.getIndexableGraphDisabled())
|
||||
throw new UnsupportedOperationException("IndexableGraph is disabled via the configuration");
|
||||
|
||||
Scanner scan = getScanner(config.getMetadataTable());
|
||||
@@ -1378,7 +1380,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
|
||||
@Override
|
||||
public Iterable<Index<? extends Element>> getIndices() {
|
||||
if (config.isIndexableGraphDisabled())
|
||||
if (config.getIndexableGraphDisabled())
|
||||
throw new UnsupportedOperationException("IndexableGraph is disabled via the configuration");
|
||||
List<Index<? extends Element>> toRet = new ArrayList<Index<? extends Element>>();
|
||||
Scanner scan = getScanner(config.getMetadataTable());
|
||||
@@ -1406,7 +1408,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
|
||||
@Override
|
||||
public void dropIndex(String indexName) {
|
||||
if (config.isIndexableGraphDisabled())
|
||||
if (config.getIndexableGraphDisabled())
|
||||
throw new UnsupportedOperationException("IndexableGraph is disabled via the configuration");
|
||||
BatchDeleter deleter = null;
|
||||
try {
|
||||
@@ -1415,7 +1417,7 @@ public class AccumuloGraph implements Graph, KeyIndexableGraph, IndexableGraph {
|
||||
config.getBatchWriterConfig());
|
||||
deleter.setRanges(Collections.singleton(new Range(indexName)));
|
||||
deleter.delete();
|
||||
config.getConnector().tableOperations().delete(config.getName() + "_index_" + indexName);
|
||||
config.getConnector().tableOperations().delete(config.getGraphName() + "_index_" + indexName);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -65,17 +65,17 @@ final class AccumuloGraphUtils {
|
||||
|
||||
// Check edge cases.
|
||||
// No tables exist, and we are not allowed to create.
|
||||
if (!existedBeforeClear && !cfg.isCreate()) {
|
||||
if (!existedBeforeClear && !cfg.getCreate()) {
|
||||
throw new IllegalArgumentException("Graph does not exist, and create option is disabled");
|
||||
}
|
||||
// Tables exist, and we are not clearing them.
|
||||
else if (existedBeforeClear && !cfg.isClear()) {
|
||||
else if (existedBeforeClear && !cfg.getClear()) {
|
||||
// Do nothing.
|
||||
return;
|
||||
}
|
||||
|
||||
// We want to clear tables, so do it.
|
||||
if (cfg.isClear()) {
|
||||
if (cfg.getClear()) {
|
||||
for (String table : cfg.getTableNames()) {
|
||||
if (tableOps.exists(table)) {
|
||||
tableOps.delete(table);
|
||||
@@ -84,7 +84,7 @@ final class AccumuloGraphUtils {
|
||||
}
|
||||
|
||||
// Tables existed, or we want to create them. So do it.
|
||||
if (existedBeforeClear || cfg.isCreate()) {
|
||||
if (existedBeforeClear || cfg.getCreate()) {
|
||||
for (String table : cfg.getTableNames()) {
|
||||
if (!tableOps.exists(table)) {
|
||||
tableOps.create(table);
|
||||
|
||||
@@ -48,7 +48,7 @@ public class AccumuloIndex<T extends Element> implements Index<T> {
|
||||
indexedType = t;
|
||||
this.parent = parent;
|
||||
this.indexName = indexName;
|
||||
tableName = parent.config.getName() + "_index_" + indexName;// + "_" +
|
||||
tableName = parent.config.getGraphName() + "_index_" + indexName;// + "_" +
|
||||
// t;
|
||||
|
||||
try {
|
||||
|
||||
@@ -26,6 +26,10 @@ import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
|
||||
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
|
||||
|
||||
public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
|
||||
|
||||
private static final String PREFIX = EdgeInputFormat.class.getSimpleName()+".";
|
||||
private static final String GRAPH_NAME = PREFIX+"graph.name";
|
||||
|
||||
static AccumuloGraphConfiguration conf;
|
||||
|
||||
@Override
|
||||
@@ -50,11 +54,11 @@ public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
|
||||
|
||||
try {
|
||||
conf = new AccumuloGraphConfiguration();
|
||||
conf.setZookeeperHosts(EdgeInputFormat.getInstance(attempt).getZooKeepers());
|
||||
conf.setZooKeeperHosts(EdgeInputFormat.getInstance(attempt).getZooKeepers());
|
||||
conf.setInstanceName(EdgeInputFormat.getInstance(attempt).getInstanceName());
|
||||
conf.setUser(EdgeInputFormat.getPrincipal(attempt));
|
||||
conf.setPassword(EdgeInputFormat.getToken(attempt));
|
||||
conf.setGraphName(attempt.getConfiguration().get(AccumuloGraphConfiguration.GRAPH_NAME));
|
||||
conf.setGraphName(attempt.getConfiguration().get(GRAPH_NAME));
|
||||
if (EdgeInputFormat.getInstance(attempt) instanceof MockInstance) {
|
||||
conf.setInstanceType(InstanceType.Mock);
|
||||
}
|
||||
@@ -107,11 +111,11 @@ public class EdgeInputFormat extends InputFormatBase<Text,Edge> {
|
||||
EdgeInputFormat.setConnectorInfo(job, cfg.getUser(), new PasswordToken(cfg.getPassword()));
|
||||
EdgeInputFormat.setInputTableName(job, cfg.getEdgeTable());
|
||||
if (cfg.getInstanceType().equals(InstanceType.Mock)) {
|
||||
EdgeInputFormat.setMockInstance(job, cfg.getInstance());
|
||||
EdgeInputFormat.setMockInstance(job, cfg.getInstanceName());
|
||||
} else {
|
||||
EdgeInputFormat.setZooKeeperInstance(job, cfg.getInstance(), cfg.getZooKeeperHosts());
|
||||
EdgeInputFormat.setZooKeeperInstance(job, cfg.getInstanceName(), cfg.getZooKeeperHosts());
|
||||
}
|
||||
job.getConfiguration().set(AccumuloGraphConfiguration.GRAPH_NAME, cfg.getName());
|
||||
job.getConfiguration().set(GRAPH_NAME, cfg.getGraphName());
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,14 @@ import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
|
||||
|
||||
public class ElementOutputFormat extends OutputFormat<NullWritable,Element> {
|
||||
|
||||
private static final String PREFIX = ElementOutputFormat.class.getSimpleName()+".";
|
||||
private static final String USER = "username";
|
||||
private static final String PASSWORD = PREFIX+"password";
|
||||
private static final String GRAPH_NAME = PREFIX+"graphName";
|
||||
private static final String INSTANCE = PREFIX+"instanceName";
|
||||
private static final String INSTANCE_TYPE = PREFIX+"instanceType";
|
||||
private static final String ZK_HOSTS = PREFIX+"zookeeperHosts";
|
||||
|
||||
@Override
|
||||
public RecordWriter<NullWritable,Element> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
|
||||
return new ElementRecordWriter(context);
|
||||
@@ -43,13 +51,13 @@ public class ElementOutputFormat extends OutputFormat<NullWritable,Element> {
|
||||
acc.validate();
|
||||
Configuration jobconf = job.getConfiguration();
|
||||
|
||||
jobconf.set(AccumuloGraphConfiguration.USER, acc.getUser());
|
||||
jobconf.set(AccumuloGraphConfiguration.PASSWORD, new String(acc.getPassword().array()));
|
||||
jobconf.set(AccumuloGraphConfiguration.GRAPH_NAME, acc.getName());
|
||||
jobconf.set(AccumuloGraphConfiguration.INSTANCE, acc.getInstance());
|
||||
jobconf.set(AccumuloGraphConfiguration.INSTANCE_TYPE, acc.getInstanceType().toString());
|
||||
jobconf.set(USER, acc.getUser());
|
||||
jobconf.set(PASSWORD, new String(acc.getPassword().array()));
|
||||
jobconf.set(GRAPH_NAME, acc.getGraphName());
|
||||
jobconf.set(INSTANCE, acc.getInstanceName());
|
||||
jobconf.set(INSTANCE_TYPE, acc.getInstanceType().toString());
|
||||
if(acc.getInstanceType().equals(InstanceType.Distributed))
|
||||
jobconf.set(AccumuloGraphConfiguration.ZK_HOSTS, acc.getZooKeeperHosts());
|
||||
jobconf.set(ZK_HOSTS, acc.getZooKeeperHosts());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -67,12 +75,12 @@ public class ElementOutputFormat extends OutputFormat<NullWritable,Element> {
|
||||
protected ElementRecordWriter(TaskAttemptContext context) {
|
||||
config = new AccumuloGraphConfiguration();
|
||||
Configuration jobconf = context.getConfiguration();
|
||||
config.setUser(jobconf.get(AccumuloGraphConfiguration.USER));
|
||||
config.setPassword(jobconf.get(AccumuloGraphConfiguration.PASSWORD));
|
||||
config.setGraphName(jobconf.get(AccumuloGraphConfiguration.GRAPH_NAME));
|
||||
config.setInstanceName(jobconf.get(AccumuloGraphConfiguration.INSTANCE));
|
||||
config.setInstanceType(InstanceType.valueOf(jobconf.get(AccumuloGraphConfiguration.INSTANCE_TYPE)));
|
||||
config.setZookeeperHosts(jobconf.get(AccumuloGraphConfiguration.ZK_HOSTS));
|
||||
config.setUser(jobconf.get(USER));
|
||||
config.setPassword(jobconf.get(PASSWORD));
|
||||
config.setGraphName(jobconf.get(GRAPH_NAME));
|
||||
config.setInstanceName(jobconf.get(INSTANCE));
|
||||
config.setInstanceType(InstanceType.valueOf(jobconf.get(INSTANCE_TYPE)));
|
||||
config.setZooKeeperHosts(jobconf.get(ZK_HOSTS));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -12,14 +12,12 @@ import org.apache.accumulo.core.client.mock.MockInstance;
|
||||
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
|
||||
import org.apache.accumulo.core.data.Key;
|
||||
import org.apache.accumulo.core.data.Value;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
|
||||
import com.tinkerpop.blueprints.Graph;
|
||||
import com.tinkerpop.blueprints.Vertex;
|
||||
|
||||
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
|
||||
@@ -30,8 +28,12 @@ import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
|
||||
public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
|
||||
static AccumuloGraphConfiguration conf;
|
||||
|
||||
private static final String PREFIX = VertexInputFormat.class.getSimpleName()+".";
|
||||
private static final String GRAPH_NAME = PREFIX+"graph.name";
|
||||
|
||||
@Override
|
||||
public RecordReader<Text,Vertex> createRecordReader(InputSplit split, TaskAttemptContext attempt) throws IOException, InterruptedException {
|
||||
public RecordReader<Text,Vertex> createRecordReader(InputSplit split,
|
||||
TaskAttemptContext attempt) throws IOException, InterruptedException {
|
||||
return new VertexRecordReader();
|
||||
}
|
||||
|
||||
@@ -52,11 +54,11 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
|
||||
|
||||
try {
|
||||
conf = new AccumuloGraphConfiguration();
|
||||
conf.setZookeeperHosts(VertexInputFormat.getInstance(attempt).getZooKeepers());
|
||||
conf.setZooKeeperHosts(VertexInputFormat.getInstance(attempt).getZooKeepers());
|
||||
conf.setInstanceName(VertexInputFormat.getInstance(attempt).getInstanceName());
|
||||
conf.setUser(VertexInputFormat.getPrincipal(attempt));
|
||||
conf.setPassword(VertexInputFormat.getToken(attempt));
|
||||
conf.setGraphName(attempt.getConfiguration().get(AccumuloGraphConfiguration.GRAPH_NAME));
|
||||
conf.setGraphName(attempt.getConfiguration().get(GRAPH_NAME));
|
||||
if (VertexInputFormat.getInstance(attempt) instanceof MockInstance) {
|
||||
conf.setInstanceType(InstanceType.Mock);
|
||||
}
|
||||
@@ -116,11 +118,11 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
|
||||
VertexInputFormat.setConnectorInfo(job, cfg.getUser(), new PasswordToken(cfg.getPassword()));
|
||||
VertexInputFormat.setInputTableName(job, cfg.getVertexTable());
|
||||
if (cfg.getInstanceType().equals(InstanceType.Mock)) {
|
||||
VertexInputFormat.setMockInstance(job, cfg.getInstance());
|
||||
VertexInputFormat.setMockInstance(job, cfg.getInstanceName());
|
||||
} else {
|
||||
VertexInputFormat.setZooKeeperInstance(job, cfg.getInstance(), cfg.getZooKeeperHosts());
|
||||
VertexInputFormat.setZooKeeperInstance(job, cfg.getInstanceName(), cfg.getZooKeeperHosts());
|
||||
}
|
||||
job.getConfiguration().set(AccumuloGraphConfiguration.GRAPH_NAME, cfg.getName());
|
||||
job.getConfiguration().set(GRAPH_NAME, cfg.getGraphName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
/* Copyright 2014 The Johns Hopkins University Applied Physics Laboratory
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package edu.jhuapl.tinkerpop;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import com.tinkerpop.blueprints.Direction;
|
||||
import com.tinkerpop.blueprints.Edge;
|
||||
import com.tinkerpop.blueprints.Vertex;
|
||||
|
||||
public class AccumuloBulkIngesterTest {
|
||||
|
||||
@Test
|
||||
public void testBulkIngester() throws Exception {
|
||||
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils.generateGraphConfig("propertyBuilder").setClear(true);
|
||||
|
||||
AccumuloBulkIngester ingester = new AccumuloBulkIngester(cfg);
|
||||
|
||||
for (String t : cfg.getTableNames()) {
|
||||
assertTrue(cfg.getConnector().tableOperations().exists(t));
|
||||
}
|
||||
|
||||
ingester.addVertex("A").finish();
|
||||
ingester.addVertex("B").add("P1", "V1").add("P2", "2").finish();
|
||||
ingester.addEdge("A", "B", "edge").add("P3", "V3").finish();
|
||||
ingester.shutdown(true);
|
||||
|
||||
cfg.setClear(false);
|
||||
AccumuloGraph graph = new AccumuloGraph(cfg);
|
||||
Vertex v1 = graph.getVertex("A");
|
||||
assertNotNull(v1);
|
||||
|
||||
Iterator<Edge> it = v1.getEdges(Direction.OUT).iterator();
|
||||
assertTrue(it.hasNext());
|
||||
|
||||
Edge e = it.next();
|
||||
assertEquals("edge", e.getLabel());
|
||||
|
||||
Vertex v2 = e.getVertex(Direction.IN);
|
||||
assertEquals("B", v2.getId());
|
||||
assertEquals("V1", v2.getProperty("P1"));
|
||||
assertEquals("2", v2.getProperty("P2"));
|
||||
|
||||
graph.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -18,7 +18,6 @@ import static org.junit.Assert.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import javax.xml.namespace.QName;
|
||||
@@ -26,8 +25,6 @@ import javax.xml.namespace.QName;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.tinkerpop.blueprints.Direction;
|
||||
import com.tinkerpop.blueprints.Edge;
|
||||
import com.tinkerpop.blueprints.GraphFactory;
|
||||
import com.tinkerpop.blueprints.Vertex;
|
||||
|
||||
@@ -149,36 +146,10 @@ public class AccumuloGraphConfigurationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkIngester() throws Exception {
|
||||
AccumuloGraphConfiguration cfg = AccumuloGraphTestUtils.generateGraphConfig("propertyBuilder").setClear(true);
|
||||
|
||||
AccumuloBulkIngester ingester = new AccumuloBulkIngester(cfg);
|
||||
|
||||
for (String t : cfg.getTableNames()) {
|
||||
assertTrue(cfg.getConnector().tableOperations().exists(t));
|
||||
}
|
||||
|
||||
ingester.addVertex("A").finish();
|
||||
ingester.addVertex("B").add("P1", "V1").add("P2", "2").finish();
|
||||
ingester.addEdge("A", "B", "edge").add("P3", "V3").finish();
|
||||
ingester.shutdown(true);
|
||||
|
||||
cfg.setClear(false);
|
||||
AccumuloGraph graph = new AccumuloGraph(cfg);
|
||||
Vertex v1 = graph.getVertex("A");
|
||||
assertNotNull(v1);
|
||||
|
||||
Iterator<Edge> it = v1.getEdges(Direction.OUT).iterator();
|
||||
assertTrue(it.hasNext());
|
||||
|
||||
Edge e = it.next();
|
||||
assertEquals("edge", e.getLabel());
|
||||
|
||||
Vertex v2 = e.getVertex(Direction.IN);
|
||||
assertEquals("B", v2.getId());
|
||||
assertEquals("V1", v2.getProperty("P1"));
|
||||
assertEquals("2", v2.getProperty("P2"));
|
||||
|
||||
graph.shutdown();
|
||||
public void testPrint() throws Exception {
|
||||
AccumuloGraphConfiguration cfg =
|
||||
AccumuloGraphTestUtils.generateGraphConfig("printTest");
|
||||
cfg.print();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ public class AccumuloGraphTestUtils {
|
||||
|
||||
public static AccumuloGraphConfiguration generateGraphConfig(String graphDirectoryName) {
|
||||
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration();
|
||||
cfg.setInstanceName("instanceName").setZookeeperHosts("ZookeeperHostsString");
|
||||
cfg.setInstanceName("instanceName").setZooKeeperHosts("ZookeeperHostsString");
|
||||
cfg.setUser("root").setPassword("".getBytes());
|
||||
cfg.setGraphName(graphDirectoryName).setCreate(true).setAutoFlush(true).setInstanceType(InstanceType.Mock);
|
||||
return cfg;
|
||||
|
||||
Reference in New Issue
Block a user