Add a Pig-based map/reduce system for filling the new query cache.

This commit is contained in:
Neil Williams
2012-06-07 11:29:50 -07:00
parent 89e0f79da3
commit 8fc311487d
15 changed files with 636 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
<project name="reddit Pig UDFs" default="dist" basedir=".">
<property name="src" location="src/" />
<property name="build" location="build/" />
<property name="dist" location="dist/" />
<property environment="env" />
<target name="init">
<tstamp />
<mkdir dir="${build}" />
</target>
<target name="compile" depends="init">
<javac srcdir="${src}" destdir="${build}" includeantruntime="false">
<classpath>
<pathelement location="${env.PIG_HOME}/pig.jar" />
</classpath>
</javac>
</target>
<target name="dist" depends="compile">
<mkdir dir="${dist}/lib" />
<jar jarfile="${dist}/lib/reddit-pig-udfs.jar" basedir="${build}" />
</target>
<target name="clean">
<delete dir="${build}" />
<delete dir="${dist}" />
</target>
</project>

View File

@@ -0,0 +1,22 @@
package com.reddit.pig;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class MAKE_FULLNAME extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
String name = (String)input.get(0);
Long id = (Long)input.get(1);
TypeID typeId = this.getTypeID(name);
if (typeId == null)
return null;
return String.format("t%d_%s", typeId.ordinal(), Long.toString(id, 36));
}
private TypeID getTypeID(String thingName) {
String enumName = thingName.toUpperCase();
return TypeID.valueOf(enumName);
}
}

View File

@@ -0,0 +1,26 @@
package com.reddit.pig;
import java.util.Map;
import java.util.HashMap;
import java.lang.System;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
public class MAKE_MAP extends EvalFunc<Map<Object, Object>> {
public Map<Object, Object> exec(Tuple input) throws IOException {
Map<Object, Object> map = new HashMap<Object, Object>();
DataBag bag = (DataBag)input.get(0);
for (Tuple tuple : bag) {
String key = (String)tuple.get(0);
Object value = tuple.get(1);
map.put(key, value.toString());
}
return map;
}
}

View File

@@ -0,0 +1,59 @@
package com.reddit.pig;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class MAKE_ROWKEY extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
String rel = (String)input.get(0);
String name = (String)input.get(1);
Long id = (Long)input.get(2);
String queryName = MAKE_ROWKEY.getQueryName(rel, name);
if (queryName == null) {
return null;
}
return queryName + "." + Long.toString(id, 36);
}
private static String getQueryName(String rel, String name) {
if (name.equals("1")) {
if (rel.equals("vote_account_link"))
return "liked";
} else if (name.equals("-1")) {
if (rel.equals("vote_account_link"))
return "disliked";
} else if (name.equals("save")) {
if (rel.equals("savehide"))
return "saved";
} else if (name.equals("hide")) {
if (rel.equals("savehide"))
return "hidden";
} else if (name.equals("inbox")) {
if (rel.equals("inbox_account_comment")) {
return "inbox_comments";
} else if (rel.equals("inbox_account_message")) {
return "inbox_messages";
} else if (rel.equals("moderatorinbox")) {
return "subreddit_messages";
} else if (rel.equals("inbox_account_comment:unread")) {
return "unread_comments";
} else if (rel.equals("inbox_account_message:unread")) {
return "unread_messages";
} else if (rel.equals("moderatorinbox:unread")) {
return "unread_subreddit_messages";
}
} else if (name.equals("selfreply")) {
if (rel.equals("inbox_account_comment")) {
return "inbox_selfreply";
} else if (rel.equals("inbox_account_comment:unread")) {
return "unread_selfreply";
}
}
return null;
}
}

View File

@@ -0,0 +1,24 @@
package com.reddit.pig;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class MAKE_THING2_FULLNAME extends MAKE_FULLNAME {
private TypeID getTypeID(String rel) {
if (rel.equals("savehide")) {
return TypeID.LINK;
} else if (rel.startsWith("inbox_account_comment")) {
return TypeID.COMMENT;
} else if (rel.startsWith("inbox_account_message")) {
return TypeID.MESSAGE;
} else if (rel.startsWith("moderatorinbox")) {
return TypeID.MESSAGE;
} else if (rel.equals("vote_account_link")) {
return TypeID.LINK;
}
return null;
}
}

View File

@@ -0,0 +1,22 @@
package com.reddit.pig;
import java.lang.Number;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class TO_36 extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
Object obj = input.get(0);
Number number;
if (obj instanceof Number) {
number = (Number)obj;
} else {
number = Long.decode(obj.toString());
}
return Long.toString(number.longValue(), 36);
}
}

View File

@@ -0,0 +1,34 @@
package com.reddit.pig;
import java.lang.*;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class TO_JSON extends EvalFunc<String> {
public String exec(Tuple input) throws IOException {
StringBuilder builder = new StringBuilder("[");
int size = input.size();
for (int i = 0; i < size; i++) {
Object obj = input.get(i);
// TODO: support integers and strings
if (obj == null) {
builder.append("null");
} else if (obj instanceof Double || obj instanceof Float) {
String formatted = String.format("%f", obj);
builder.append(formatted);
} else {
throw new UnsupportedOperationException("can only encode nulls and floating point numbers");
}
if (i + 1 != size)
builder.append(",");
}
builder.append("]");
return builder.toString();
}
}

View File

@@ -0,0 +1,12 @@
package com.reddit.pig;
// customize these to match your instance's typeids
enum TypeID {
INVALID,
COMMENT,
ACCOUNT,
LINK,
MESSAGE,
SUBREDDIT,
}