Files
reddit/scripts/migrate/regenerate-query-cache.py
Neil Williams 68291521b5 Move query_cache scripts to a more general "migrate" directory.
Not just for the query cache anymore!
2012-12-04 11:41:24 -08:00

223 lines
5.6 KiB
Python

#!/usr/bin/python
from org.apache.pig.scripting import Pig
SCRIPT_ROOT = "udfs/dist/lib/"
INPUT_ROOT = "input/"
OUTPUT_ROOT = "output"
relations = {"savehide": ("UserQueryCache", "link"),
"inbox_account_comment": ("UserQueryCache", "comment"),
"inbox_account_message": ("UserQueryCache", "message"),
"moderatorinbox": ("SubredditQueryCache", "message"),
"vote_account_link": ("UserQueryCache", "link"),
}
####### Pig script fragments
load_things = """
things =
LOAD '$THINGS'
USING PigStorage()
AS (id:long,
ups:int,
downs:int,
deleted:chararray,
spam:chararray,
timestamp:double);
"""
make_things_items = """
items =
FOREACH things GENERATE *;
"""
load_rels = """
items =
LOAD '$RELS'
USING PigStorage()
AS (id:long,
thing1_id:long,
thing2_id:long,
name:chararray,
timestamp:double);
"""
load_and_map_data = """
data =
LOAD '$DATA'
USING PigStorage()
AS (id:long,
key:chararray,
value);
grouped_with_data =
COGROUP items BY id, data BY id;
items_with_data =
FOREACH grouped_with_data
GENERATE FLATTEN(items),
com.reddit.pig.MAKE_MAP(data.(key, value)) AS data;
"""
add_unread = """
SPLIT items_with_data
INTO inbox IF 1 == 1,
unread IF (chararray)data#'new' == 't';
inbox_with_relname =
FOREACH inbox GENERATE '$RELATION' AS relation, *;
unread_with_relname =
FOREACH unread GENERATE '$RELATION:unread' AS relation, *;
rels_with_relname =
UNION ONSCHEMA inbox_with_relname,
unread_with_relname;
"""
add_relname = """
rels_with_relname =
FOREACH items GENERATE '$RELATION' AS relation, *;
"""
generate_rel_items = """
minimal_things =
FOREACH things GENERATE id, deleted;
joined =
JOIN rels_with_relname BY thing2_id LEFT OUTER,
minimal_things BY id;
only_valid =
FILTER joined BY minimal_things::id IS NOT NULL AND
deleted == 'f';
potential_columns =
FOREACH only_valid
GENERATE com.reddit.pig.MAKE_ROWKEY(relation, name, thing1_id) AS rowkey,
com.reddit.pig.MAKE_THING2_FULLNAME(relation, thing2_id) AS colkey,
timestamp AS value;
"""
store_top_1000_per_rowkey = """
non_null =
FILTER potential_columns BY rowkey IS NOT NULL AND colkey IS NOT NULL;
grouped =
GROUP non_null BY rowkey;
limited =
FOREACH grouped {
sorted = ORDER non_null BY value DESC;
limited = LIMIT sorted 1000;
GENERATE group AS rowkey, FLATTEN(limited.(colkey, value));
};
jsonified =
FOREACH limited GENERATE rowkey,
colkey,
com.reddit.pig.TO_JSON(value);
STORE jsonified INTO '$OUTPUT' USING PigStorage();
"""
###### run the jobs
# register the reddit udfs
Pig.registerJar(SCRIPT_ROOT + "reddit-pig-udfs.jar")
# process rels
for rel, (cf, thing2_type) in relations.iteritems():
# build source for a script
script = "SET default_parallel 10;"
script += load_rels
if "inbox" in rel:
script += load_and_map_data
script += add_unread
else:
script += add_relname
script += load_things
script += generate_rel_items
script += store_top_1000_per_rowkey
# run it
compiled = Pig.compile(script)
bound = compiled.bind({
"RELS": INPUT_ROOT + rel + ".dump",
"DATA": INPUT_ROOT + rel + "-data.dump",
"THINGS": INPUT_ROOT + thing2_type + ".dump",
"RELATION": rel,
"OUTPUT": "/".join((OUTPUT_ROOT, cf, rel)),
})
bound.runSingle()
# rebuild message-based queries (just get_sent right now)
if False:
script = "SET default_parallel 10;"
script += load_things
script += make_things_items
script += load_and_map_data
script += """
non_null =
FILTER items_with_data BY data#'author_id' IS NOT NULL;
potential_columns =
FOREACH non_null
GENERATE
CONCAT('sent.', com.reddit.pig.TO_36(data#'author_id')) AS rowkey,
com.reddit.pig.MAKE_FULLNAME('message', id) AS colkey,
timestamp AS value;
"""
script += store_top_1000_per_rowkey
compiled = Pig.compile(script)
bound = compiled.bind({
"THINGS": INPUT_ROOT + "message.dump",
"DATA": INPUT_ROOT + "message-data.dump",
"OUTPUT": "/".join((OUTPUT_ROOT, "UserQueryCache", "sent")),
})
result = bound.runSingle()
# rebuild comment-based queries
if True:
script = "SET default_parallel 10;"
script += load_things
script += make_things_items
script += load_and_map_data
script += """
SPLIT items_with_data INTO
spam_comments IF spam == 't',
ham_comments IF spam == 'f';
ham_comments_with_name =
FOREACH ham_comments GENERATE 'sr_comments' AS name, *;
reported_comments =
FILTER ham_comments BY (int)data#'reported' > 0;
reported_comments_with_name =
FOREACH reported_comments GENERATE 'reported_comments' AS name, *;
spam_comments_with_name =
FOREACH spam_comments GENERATE 'spam_comments' AS name, *;
comments_with_name =
UNION ONSCHEMA ham_comments_with_name,
reported_comments_with_name,
spam_comments_with_name;
potential_columns =
FOREACH comments_with_name GENERATE
CONCAT(name, CONCAT('.', com.reddit.pig.TO_36(data#'sr_id'))) AS rowkey,
com.reddit.pig.MAKE_FULLNAME('comment', id) AS colkey,
timestamp AS value;
"""
script += store_top_1000_per_rowkey
compiled = Pig.compile(script)
bound = compiled.bind({
"THINGS": INPUT_ROOT + "comment.dump",
"DATA": INPUT_ROOT + "comment-data.dump",
"OUTPUT": "/".join((OUTPUT_ROOT, "SubredditQueryCache", "comment")),
})
result = bound.runSingle()