mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-01-28 16:28:01 -05:00
159 lines
6.5 KiB
Pig
Executable File
159 lines
6.5 KiB
Pig
Executable File
/* EMR Version
|
|
*
|
|
* Process hourly logfile
|
|
* for each category sitewide/subreddit/srpath/lang/clicks/clicks_targeted/thing/thingtarget
|
|
* generate output with entries like:
|
|
* category, unique_id, count (e.g. subreddit: pics, 123456, 9)
|
|
*
|
|
* Needs to be passed: LOGFILE, OUTPUT
|
|
*/
|
|
|
|
/****************************************************
|
|
* DEFINITIONS
|
|
****************************************************/
|
|
%default TEMPDIR 'hdfs:///tmp/processed_logs';
|
|
|
|
-- Binaries - location is specified in traffic_bootstrap.sh
|
|
DEFINE PARSE_HOUR `/home/hadoop/traffic/parse_hour`;
|
|
DEFINE DECRYPT_USERINFO `/home/hadoop/traffic/decrypt_userinfo`;
|
|
DEFINE VERIFY `/home/hadoop/traffic/verify`;
|
|
|
|
-- Pixel definitions
|
|
%default URL_USERINFO '/pixel/of_destiny.png';
|
|
%default URL_ADFRAME '/pixel/of_defenestration.png';
|
|
%default URL_PROMOTEDLINK '/pixel/of_doom.png';
|
|
%default URL_CLICK '/click';
|
|
|
|
-- Cleanup
|
|
rmf $OUTPUT
|
|
rmf $TEMPDIR
|
|
|
|
/****************************************************
|
|
* LOAD LOGFILE
|
|
****************************************************/
|
|
|
|
log_raw = LOAD '$LOGFILE' USING TextLoader() AS (line);
|
|
log_parsed = STREAM log_raw THROUGH PARSE_HOUR AS (ip, path:chararray, query, unique_id);
|
|
|
|
SPLIT log_parsed INTO
|
|
pageviews_with_path IF path == '$URL_USERINFO',
|
|
unverified_hits IF (path == '$URL_ADFRAME' OR
|
|
path == '$URL_PROMOTEDLINK' OR
|
|
path == '$URL_CLICK');
|
|
|
|
pageviews_encrypted = FOREACH pageviews_with_path GENERATE unique_id, query;
|
|
|
|
-- Store intermediate results because of bug: https://issues.apache.org/jira/browse/PIG-2442
|
|
-- If/when EMR gets to 0.10 or 0.9.3 we can remove this
|
|
STORE pageviews_encrypted INTO '$TEMPDIR/pageviews_encrypted';
|
|
STORE unverified_hits INTO '$TEMPDIR/unverified_hits';
|
|
|
|
/****************************************************
|
|
* PAGEVIEWS
|
|
****************************************************/
|
|
|
|
pageviews_encrypted = LOAD '$TEMPDIR/pageviews_encrypted' AS (unique_id, query);
|
|
|
|
pageviews = STREAM pageviews_encrypted THROUGH DECRYPT_USERINFO AS (unique_id, srpath, subreddit, lang, cname);
|
|
|
|
-- sitewide
|
|
sitewide_pageviews = FOREACH pageviews GENERATE unique_id; -- (unique_id)
|
|
sitewide_hourly_uniques_grouped = GROUP sitewide_pageviews BY unique_id; -- (unique_id, {(unique_id), ...}
|
|
sitewide_hourly_uniques = FOREACH sitewide_hourly_uniques_grouped
|
|
GENERATE group AS unique_id, COUNT(sitewide_pageviews) AS count; -- (unique_id, count)
|
|
|
|
STORE sitewide_hourly_uniques INTO '$OUTPUT/sitewide';
|
|
|
|
-- subreddit
|
|
subreddit_pageviews_filtered = FILTER pageviews
|
|
BY subreddit IS NOT NULL; -- exclude entries without subreddit
|
|
subreddit_pageviews_raw = FOREACH subreddit_pageviews_filtered
|
|
GENERATE subreddit, unique_id; -- limit to (subreddit, unique_id)
|
|
subreddit_hourly_uniques_grouped = GROUP subreddit_pageviews_raw
|
|
BY (subreddit, unique_id); -- (subreddit, unique_id, {(subreddit, unique_id), ...})
|
|
subreddit_hourly_uniques = FOREACH subreddit_hourly_uniques_grouped
|
|
GENERATE group.subreddit, group.unique_id,
|
|
COUNT(subreddit_pageviews_raw) AS count; -- (subreddit, unique_id, count)
|
|
|
|
STORE subreddit_hourly_uniques INTO '$OUTPUT/subreddit';
|
|
|
|
-- subreddit path
|
|
srpath_filtered = FILTER pageviews BY srpath IS NOT NULL;
|
|
srpath_pageviews = FOREACH srpath_filtered
|
|
GENERATE srpath, unique_id;
|
|
srpath_hourly_uniques_grouped = GROUP srpath_pageviews
|
|
BY (srpath, unique_id);
|
|
srpath_hourly_uniques = FOREACH srpath_hourly_uniques_grouped
|
|
GENERATE group.srpath, group.unique_id,
|
|
COUNT(srpath_pageviews) AS count;
|
|
|
|
STORE srpath_hourly_uniques INTO '$OUTPUT/srpath';
|
|
|
|
-- language
|
|
lang_filtered = FILTER pageviews BY lang IS NOT NULL;
|
|
lang_pageviews = FOREACH lang_filtered GENERATE lang, unique_id;
|
|
lang_hourly_uniques_grouped = GROUP lang_pageviews BY (lang, unique_id);
|
|
lang_hourly_uniques = FOREACH lang_hourly_uniques_grouped
|
|
GENERATE group.lang, group.unique_id,
|
|
COUNT(lang_pageviews) AS count;
|
|
|
|
STORE lang_hourly_uniques INTO '$OUTPUT/lang';
|
|
|
|
/****************************************************
|
|
* HITS
|
|
****************************************************/
|
|
|
|
unverified_hits = LOAD '$TEMPDIR/unverified_hits' AS (ip, path:chararray, query, unique_id);
|
|
|
|
-- process unverified hits
|
|
verified = STREAM unverified_hits THROUGH VERIFY AS (unique_id, path:chararray, fullname, sr);
|
|
|
|
-- ads and promoted links
|
|
|
|
SPLIT verified INTO
|
|
clicks_raw IF path == '$URL_CLICK',
|
|
ad_impressions IF (path == '$URL_ADFRAME' OR
|
|
path == '$URL_PROMOTEDLINK');
|
|
|
|
-- clicks
|
|
clicks = FOREACH clicks_raw GENERATE fullname, unique_id;
|
|
clicks_grouped = GROUP clicks BY (fullname, unique_id);
|
|
clicks_by_hour = FOREACH clicks_grouped
|
|
GENERATE group.fullname, group.unique_id,
|
|
COUNT(clicks) AS count;
|
|
|
|
STORE clicks_by_hour INTO '$OUTPUT/clicks';
|
|
|
|
-- targeted clicks
|
|
targeted_clicks = FOREACH clicks_raw GENERATE fullname, sr, unique_id;
|
|
targeted_clicks_grouped = GROUP targeted_clicks BY (fullname, sr, unique_id);
|
|
targeted_clicks_by_hour = FOREACH targeted_clicks_grouped
|
|
GENERATE group.fullname, group.sr, group.unique_id,
|
|
COUNT(targeted_clicks) AS count;
|
|
|
|
STORE targeted_clicks_by_hour INTO '$OUTPUT/clicks_targeted';
|
|
|
|
-- things
|
|
thing_impressions = FOREACH ad_impressions GENERATE fullname, unique_id;
|
|
thing_impressions_grouped = GROUP thing_impressions BY (fullname, unique_id);
|
|
thing_impressions_hourly = FOREACH thing_impressions_grouped
|
|
GENERATE group.fullname, group.unique_id,
|
|
COUNT(thing_impressions) AS count;
|
|
|
|
STORE thing_impressions_hourly INTO '$OUTPUT/thing';
|
|
|
|
-- targeted things
|
|
targeted_thing_impressions = FOREACH ad_impressions
|
|
GENERATE fullname, sr, unique_id;
|
|
targeted_thing_impressions_grouped = GROUP targeted_thing_impressions
|
|
BY (fullname, sr, unique_id);
|
|
targeted_thing_impressions_hourly = FOREACH targeted_thing_impressions_grouped
|
|
GENERATE group.fullname, group.sr,
|
|
group.unique_id,
|
|
COUNT(targeted_thing_impressions)
|
|
AS count;
|
|
|
|
STORE targeted_thing_impressions_hourly INTO '$OUTPUT/thingtarget';
|
|
|
|
rmf $TEMPDIR
|