Added ElementOutputFormat

This commit is contained in:
Ryan Webb
2014-09-30 09:41:54 -04:00
parent 73cdc4e575
commit ce57ee552d
10 changed files with 1866 additions and 1603 deletions

View File

@@ -0,0 +1,118 @@
package edu.jhuapl.tinkerpop.mapreduce;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import com.tinkerpop.blueprints.Element;
import edu.jhuapl.tinkerpop.AccumuloByteSerializer;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
public class ElementOutputFormat extends OutputFormat<NullWritable,Element> {
@Override
public RecordWriter<NullWritable,Element> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new ElementRecordWriter(context);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
public static void setAccumuloConfiguration(Job job, AccumuloGraphConfiguration acc) {
acc.validate();
Configuration jobconf = job.getConfiguration();
jobconf.set(AccumuloGraphConfiguration.USER, acc.getUser());
jobconf.set(AccumuloGraphConfiguration.PASSWORD, new String(acc.getPassword().array()));
jobconf.set(AccumuloGraphConfiguration.GRAPH_NAME, acc.getName());
jobconf.set(AccumuloGraphConfiguration.INSTANCE, acc.getInstance());
jobconf.set(AccumuloGraphConfiguration.INSTANCE_TYPE, acc.getInstanceType().toString());
jobconf.set(AccumuloGraphConfiguration.ZK_HOSTS, acc.getZooKeeperHosts());
}
/**
* @see AccumuloOutputFormat
*/
// TODO I think we can implement this to provide a little more robustness.
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new NullOutputFormat<Text,Mutation>().getOutputCommitter(context);
}
class ElementRecordWriter extends RecordWriter<NullWritable,Element> {
AccumuloGraphConfiguration config;
protected ElementRecordWriter(TaskAttemptContext context) {
config = new AccumuloGraphConfiguration();
Configuration jobconf = context.getConfiguration();
config.setUser(jobconf.get(AccumuloGraphConfiguration.USER));
config.setPassword(jobconf.get(AccumuloGraphConfiguration.PASSWORD));
config.setGraphName(jobconf.get(AccumuloGraphConfiguration.GRAPH_NAME));
config.setInstanceName(jobconf.get(AccumuloGraphConfiguration.INSTANCE));
config.setInstanceType(InstanceType.valueOf(jobconf.get(AccumuloGraphConfiguration.INSTANCE_TYPE)));
config.setZookeeperHosts(jobconf.get(AccumuloGraphConfiguration.ZK_HOSTS));
}
BatchWriter bw;
@Override
public void write(NullWritable key, Element value) throws IOException, InterruptedException {
MapReduceElement ele = (MapReduceElement) value;
try {
if (bw == null) {
if (ele instanceof MapReduceVertex) {
bw = config.getConnector().createBatchWriter(config.getVertexTable(), config.getBatchWriterConfig());
} else {
bw = config.getConnector().createBatchWriter(config.getEdgeTable(), config.getBatchWriterConfig());
}
}
Mutation mut = new Mutation(ele.id);
for (Entry<String,Object> map : ele.getNewProperties().entrySet()) {
mut.put(map.getKey().getBytes(), "".getBytes(), AccumuloByteSerializer.serialize(map.getValue()));
}
bw.addMutation(mut);
} catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (bw != null) {
try {
bw.close();
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -17,6 +17,7 @@ package edu.jhuapl.tinkerpop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.Set;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.collect.Sets;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
@@ -36,6 +38,8 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
protected Map<String,Object> properties;
protected Map<String,Object> newProperties;
AccumuloGraph parent;
MapReduceElement(AccumuloGraph parent) {
@@ -51,6 +55,10 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
properties.put(key, property);
}
Map<String,Object> getNewProperties(){
return newProperties;
}
@Override
public Object getId() {
return id;
@@ -58,12 +66,17 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
@Override
public <T> T getProperty(String key) {
Object newProp = newProperties.get(key);
if(newProp!=null)
return (T) newProp;
return (T) properties.get(key);
}
@Override
public Set<String> getPropertyKeys() {
return new HashSet<String>(properties.keySet());
return Sets.union(new HashSet<String>(properties.keySet()),
new HashSet<String>(newProperties.keySet())) ;
}
@Override
@@ -78,7 +91,7 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
@Override
public void setProperty(String key, Object value) {
throw new UnsupportedOperationException("You cannot modify an element during a MapReduce job.");
newProperties.put(key, value);
}
protected Graph getParent() {
@@ -98,6 +111,16 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
Object val = AccumuloByteSerializer.desserialize(data);
properties.put(key, val);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
String key = in.readUTF();
byte[] data = new byte[in.readInt()];
in.readFully(data);
Object val = AccumuloByteSerializer.desserialize(data);
newProperties.put(key, val);
}
}
@Override
@@ -110,6 +133,13 @@ public abstract class MapReduceElement implements Element, WritableComparable<Ma
out.writeInt(data.length);
out.write(data);
}
for (String key : newProperties.keySet()) {
out.writeUTF(key);
byte[] data = AccumuloByteSerializer.serialize(newProperties.get(key));
out.writeInt(data.length);
out.write(data);
}
}
@Override

View File

@@ -0,0 +1,7 @@
package edu.jhuapl.tinkerpop.mapreduce;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
public class NewElementOutputFormat extends AccumuloOutputFormat{
}

View File

@@ -120,7 +120,7 @@ public class VertexInputFormat extends InputFormatBase<Text,Vertex> {
} else {
VertexInputFormat.setZooKeeperInstance(job, cfg.getInstance(), cfg.getZooKeeperHosts());
}
job.getConfiguration().set("blueprints.accumulo.name", cfg.getName());
job.getConfiguration().set(AccumuloGraphConfiguration.GRAPH_NAME, cfg.getName());
}
}

View File

@@ -0,0 +1,109 @@
package edu.jhuapl.tinkerpop.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.Vertex;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration;
import edu.jhuapl.tinkerpop.AccumuloGraphConfiguration.InstanceType;
public class ElementOutputFormatTest {
private static AssertionError e1 = null;
private static AssertionError e2 = null;
private static class MRTester extends Configured implements Tool {
private static class TestVertexMapper extends Mapper<Text,Vertex,NullWritable,Element> {
int count = 0;
@Override
protected void map(Text k, Vertex v, Context context) throws IOException, InterruptedException {
try {
assertEquals(k.toString(), v.getId().toString());
v.setProperty("NAME", "BANANA" + v.getId());
context.write(NullWritable.get(), v);
} catch (AssertionError e) {
e1 = e;
}
count++;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
assertEquals(100, count);
} catch (AssertionError e) {
e2 = e;
}
}
}
@Override
public int run(String[] args) throws Exception {
setConf(new Configuration());
getConf().set("fs.default.name", "local");
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration().setInstanceName("_mapreduce_instance2").setUser("root").setPassword("".getBytes())
.setGraphName("_mapreduce_table_2").setInstanceType(InstanceType.Mock).setCreate(true);
job.setInputFormatClass(EdgeInputFormat.class);
EdgeInputFormat.setAccumuloGraphConfiguration(job, cfg);
job.setMapperClass(TestVertexMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Element.class);
job.setOutputFormatClass(ElementOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
public static int main(String[] args) throws Exception {
return ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args);
}
}
@Test
public void testVertexInputMap() throws Exception {
final String INSTANCE_NAME = "_mapreduce_instance2";
final String TEST_TABLE_1 = "_mapreduce_table_2";
if (!System.getProperty("os.name").startsWith("Windows")) {
Graph g = GraphFactory.open(new AccumuloGraphConfiguration().setInstanceName(INSTANCE_NAME).setUser("root").setPassword("".getBytes())
.setGraphName(TEST_TABLE_1).setInstanceType(InstanceType.Mock).setCreate(true).getConfiguration());
for (int i = 0; i < 100; i++) {
g.addVertex(i + "");
}
assertEquals(0, MRTester.main(new String[] {}));
assertNull(e1);
assertNull(e2);
assertEquals(g.getVertex("1").getProperty("NAME"), "BANANA1");
}
}
}

View File

@@ -34,7 +34,6 @@ public class InputFormatsTest {
private static class MRTester extends Configured implements Tool {
private static class TestEdgeMapper extends Mapper<Text,Edge,NullWritable,NullWritable> {
// Key key = null;
int count = 0;
@Override