mirror of
https://github.com/JHUAPL/AccumuloGraph.git
synced 2026-01-09 20:57:55 -05:00
Added functionality to reuse Connector, and prohibit configuration changes after use
This commit is contained in:
@@ -42,6 +42,8 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
|
||||
import org.apache.commons.configuration.AbstractConfiguration;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.commons.configuration.PropertiesConfiguration;
|
||||
import org.apache.commons.configuration.event.ConfigurationEvent;
|
||||
import org.apache.commons.configuration.event.ConfigurationListener;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import com.tinkerpop.blueprints.Graph;
|
||||
@@ -62,7 +64,13 @@ implements Serializable {
|
||||
/**
|
||||
* Backing configuration object.
|
||||
*/
|
||||
private Configuration conf;
|
||||
private PropertiesConfiguration conf;
|
||||
|
||||
/**
|
||||
* Maintain a connector instance that will be
|
||||
* reused for {@link #getConnector()}.
|
||||
*/
|
||||
private Connector connector;
|
||||
|
||||
/**
|
||||
* Temp directory used by getInstance when a Mini InstanceType is used.
|
||||
@@ -189,6 +197,15 @@ implements Serializable {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a copy of this configuration.
|
||||
* This is needed to change config parameters
|
||||
* if {@link #getConnector()} has been used.
|
||||
*/
|
||||
public AccumuloGraphConfiguration clone() {
|
||||
return new AccumuloGraphConfiguration(this);
|
||||
}
|
||||
|
||||
public boolean getCreate() {
|
||||
return conf.getBoolean(Keys.CREATE);
|
||||
}
|
||||
@@ -736,7 +753,7 @@ implements Serializable {
|
||||
|
||||
public String[] getPreloadedProperties() {
|
||||
return conf.containsKey(Keys.PRELOADED_PROPERTIES) ?
|
||||
conf.getStringArray(Keys.PRELOADED_PROPERTIES) : null;
|
||||
conf.getStringArray(Keys.PRELOADED_PROPERTIES) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -866,6 +883,7 @@ implements Serializable {
|
||||
|
||||
/**
|
||||
* Create a {@link Connector} from this configuration.
|
||||
* <p/>Note: Once this is called, the configuration may not be modified.
|
||||
* @return
|
||||
* @throws AccumuloException
|
||||
* @throws AccumuloSecurityException
|
||||
@@ -874,42 +892,56 @@ implements Serializable {
|
||||
*/
|
||||
public Connector getConnector() throws AccumuloException, AccumuloSecurityException,
|
||||
IOException, InterruptedException {
|
||||
Instance inst = null;
|
||||
switch (getInstanceType()) {
|
||||
case Distributed:
|
||||
if (getInstanceName() == null) {
|
||||
throw new IllegalArgumentException("Must specify instance name for distributed mode");
|
||||
} else if (getZooKeeperHosts() == null) {
|
||||
throw new IllegalArgumentException("Must specify ZooKeeper hosts for distributed mode");
|
||||
if (connector == null) {
|
||||
Instance inst = null;
|
||||
switch (getInstanceType()) {
|
||||
case Distributed:
|
||||
if (getInstanceName() == null) {
|
||||
throw new IllegalArgumentException("Must specify instance name for distributed mode");
|
||||
} else if (getZooKeeperHosts() == null) {
|
||||
throw new IllegalArgumentException("Must specify ZooKeeper hosts for distributed mode");
|
||||
}
|
||||
inst = new ZooKeeperInstance(getInstanceName(), getZooKeeperHosts());
|
||||
break;
|
||||
|
||||
case Mini:
|
||||
File dir = null;
|
||||
if (miniClusterTempDir == null) {
|
||||
dir = createTempDir();
|
||||
dir.deleteOnExit();
|
||||
} else {
|
||||
// already set by setMiniClusterTempDir(), It should be cleaned up outside of this class.
|
||||
dir = new File(miniClusterTempDir);
|
||||
}
|
||||
accumuloMiniCluster = new MiniAccumuloCluster(dir, ""); // conf.getString(PASSWORD)
|
||||
try {
|
||||
accumuloMiniCluster.start();
|
||||
} catch (Exception ex) {
|
||||
throw new AccumuloGraphException(ex);
|
||||
}
|
||||
inst = new ZooKeeperInstance(accumuloMiniCluster.getInstanceName(), accumuloMiniCluster.getZooKeepers());
|
||||
throw new UnsupportedOperationException("TODO");
|
||||
|
||||
case Mock:
|
||||
inst = new MockInstance(getInstanceName());
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new AccumuloGraphException("Unexpected instance type: " + inst);
|
||||
}
|
||||
|
||||
connector = inst.getConnector(getUser(), new PasswordToken(getPassword()));
|
||||
|
||||
// Make the configuration immutable.
|
||||
conf.addConfigurationListener(new ConfigurationListener() {
|
||||
@Override
|
||||
public void configurationChanged(ConfigurationEvent event) {
|
||||
throw new AccumuloGraphException("You may not modify the configuration after calling getConnector()");
|
||||
}
|
||||
inst = new ZooKeeperInstance(getInstanceName(), getZooKeeperHosts());
|
||||
break;
|
||||
case Mini:
|
||||
File dir = null;
|
||||
if (miniClusterTempDir == null) {
|
||||
dir = createTempDir();
|
||||
dir.deleteOnExit();
|
||||
} else {
|
||||
// already set by setMiniClusterTempDir(), It should be cleaned up outside of this class.
|
||||
dir = new File(miniClusterTempDir);
|
||||
}
|
||||
accumuloMiniCluster = new MiniAccumuloCluster(dir, ""); // conf.getString(PASSWORD)
|
||||
try {
|
||||
accumuloMiniCluster.start();
|
||||
} catch (Exception ex) {
|
||||
throw new AccumuloGraphException(ex);
|
||||
}
|
||||
inst = new ZooKeeperInstance(accumuloMiniCluster.getInstanceName(), accumuloMiniCluster.getZooKeepers());
|
||||
throw new UnsupportedOperationException("TODO");
|
||||
case Mock:
|
||||
inst = new MockInstance(getInstanceName());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unexpected instance type: " + inst);
|
||||
});
|
||||
}
|
||||
|
||||
Connector c = inst.getConnector(getUser(), new PasswordToken(getPassword()));
|
||||
return c;
|
||||
return connector;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1021,7 +1053,7 @@ implements Serializable {
|
||||
checkPropertyValue(Keys.GRAPH_NAME, getGraphName(), false);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unexpected instance type: " + getInstanceType());
|
||||
throw new AccumuloGraphException("Unexpected instance type: " + getInstanceType());
|
||||
}
|
||||
|
||||
int timeout = getPropertyCacheTimeout(null);
|
||||
@@ -1046,7 +1078,7 @@ implements Serializable {
|
||||
+ "without first setting #propertyCacheTimeout(String property, int millis) "
|
||||
+ "to a positive value.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkPropertyValue(String prop, String val, boolean canBeEmpty) {
|
||||
if (val == null) {
|
||||
@@ -1091,7 +1123,7 @@ implements Serializable {
|
||||
try {
|
||||
keys.add((String) field.get(null));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
throw new AccumuloGraphException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1105,6 +1137,7 @@ implements Serializable {
|
||||
* @param zookeeperHosts
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public AccumuloGraphConfiguration setZookeeperHosts(String zookeeperHosts) {
|
||||
return setZooKeeperHosts(zookeeperHosts);
|
||||
}
|
||||
@@ -1114,6 +1147,7 @@ implements Serializable {
|
||||
* @param skip
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public AccumuloGraphConfiguration skipExistenceChecks(boolean skip) {
|
||||
return setSkipExistenceChecks(skip);
|
||||
}
|
||||
@@ -1122,6 +1156,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getAutoIndex()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isAutoIndex() {
|
||||
return getAutoIndex();
|
||||
}
|
||||
@@ -1130,6 +1165,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getIndexableGraphDisabled()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isIndexableGraphDisabled() {
|
||||
return getIndexableGraphDisabled();
|
||||
}
|
||||
@@ -1138,6 +1174,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getCreate()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isCreate() {
|
||||
return getCreate();
|
||||
}
|
||||
@@ -1146,6 +1183,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getClear()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isClear() {
|
||||
return getClear();
|
||||
}
|
||||
@@ -1154,6 +1192,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getInstanceName()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public String getInstance() {
|
||||
return getInstanceName();
|
||||
}
|
||||
@@ -1162,6 +1201,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getAutoFlush()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isAutoFlush() {
|
||||
return getAutoFlush();
|
||||
}
|
||||
@@ -1170,6 +1210,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getPropertyCacheTimeout(String)}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public Integer getPropertyCacheTimeoutMillis(String property) {
|
||||
return getPropertyCacheTimeout(property);
|
||||
}
|
||||
@@ -1178,6 +1219,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getEdgeCacheTimeout()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public Integer getEdgeCacheTimeoutMillis() {
|
||||
return getEdgeCacheTimeout();
|
||||
}
|
||||
@@ -1186,6 +1228,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getVertexCacheTimeout()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public Integer getVertexCacheTimeoutMillis() {
|
||||
return getVertexCacheTimeout();
|
||||
}
|
||||
@@ -1194,6 +1237,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getSkipExistenceChecks()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean skipExistenceChecks() {
|
||||
return getSkipExistenceChecks();
|
||||
}
|
||||
@@ -1202,6 +1246,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getGraphName()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public String getName() {
|
||||
return getGraphName();
|
||||
}
|
||||
@@ -1210,6 +1255,7 @@ implements Serializable {
|
||||
* @deprecated This is an old method name. Use {@link #getPreloadedEdgeLabels()}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
public String[] getPreloadedEdges() {
|
||||
return getPreloadedEdgeLabels();
|
||||
}
|
||||
|
||||
@@ -43,8 +43,7 @@ public class AccumuloBulkIngesterTest {
|
||||
ingester.addEdge("A", "B", "edge").add("P3", "V3").finish();
|
||||
ingester.shutdown(true);
|
||||
|
||||
cfg.setClear(false);
|
||||
AccumuloGraph graph = new AccumuloGraph(cfg);
|
||||
AccumuloGraph graph = new AccumuloGraph(cfg.clone().setClear(false));
|
||||
Vertex v1 = graph.getVertex("A");
|
||||
assertNotNull(v1);
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ import org.junit.Test;
|
||||
import com.tinkerpop.blueprints.GraphFactory;
|
||||
import com.tinkerpop.blueprints.Vertex;
|
||||
|
||||
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
|
||||
|
||||
public class AccumuloGraphConfigurationTest {
|
||||
|
||||
@Test
|
||||
@@ -149,14 +151,14 @@ public class AccumuloGraphConfigurationTest {
|
||||
}
|
||||
graph.shutdown();
|
||||
|
||||
graph = new AccumuloGraph(cfg.setCreate(false));
|
||||
graph = new AccumuloGraph(cfg.clone().setCreate(false));
|
||||
assertTrue(graph.isEmpty());
|
||||
graph.addVertex("A");
|
||||
graph.addVertex("B");
|
||||
assertFalse(graph.isEmpty());
|
||||
graph.shutdown();
|
||||
|
||||
graph = new AccumuloGraph(cfg.setClear(true));
|
||||
graph = new AccumuloGraph(cfg.clone().setClear(true));
|
||||
assertTrue(graph.isEmpty());
|
||||
graph.shutdown();
|
||||
}
|
||||
@@ -269,4 +271,26 @@ public class AccumuloGraphConfigurationTest {
|
||||
fail();
|
||||
} catch (Exception e) { }
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImmutableConnector() throws Exception {
|
||||
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration().setInstanceType(
|
||||
InstanceType.Mock).setGraphName("immutableConnector")
|
||||
.setCreate(true).setAutoFlush(false);
|
||||
|
||||
cfg.getConnector();
|
||||
|
||||
try {
|
||||
cfg.setCreate(false);
|
||||
fail();
|
||||
} catch (Exception e) { }
|
||||
|
||||
try {
|
||||
cfg.setAutoFlush(true);
|
||||
fail();
|
||||
} catch (Exception e) { }
|
||||
|
||||
assertTrue(cfg.getCreate());
|
||||
assertFalse(cfg.getAutoFlush());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user