From 6a2c4afcb296b60aaae3e8480c6af8d95ac0dff3 Mon Sep 17 00:00:00 2001 From: Neil Williams Date: Wed, 20 Jun 2012 10:58:31 -0700 Subject: [PATCH] Directly access traffic database for traffic queries. Traffic stats are currently pulled from the traffic app, which is a separate pylons application, via HTTP requests. This patch eliminates the dependency on that extra app by moving the queries into the reddit app while maintaining the existing API near-verbatim. This allows us to ditch the external traffic app. --- r2/r2/lib/pages/pages.py | 8 +- r2/r2/lib/traffic.py | 140 ++++++------ r2/r2/lib/utils/_utils.pyx | 3 + r2/r2/models/traffic.py | 310 +++++++++++++++++++++++++++ r2/r2/templates/promotedtraffic.html | 2 +- r2/r2/templates/reddittraffic.html | 2 +- 6 files changed, 381 insertions(+), 84 deletions(-) diff --git a/r2/r2/lib/pages/pages.py b/r2/r2/lib/pages/pages.py index 3acd6d072..9a766dd1c 100755 --- a/r2/r2/lib/pages/pages.py +++ b/r2/r2/lib/pages/pages.py @@ -3256,10 +3256,8 @@ class PromotedTraffic(Traffic): self.preliminary = (until + datetime.timedelta(1) > now) self.traffic = load_traffic('hour', "thing", thing._fullname, start_time = d, stop_time = until) - # load monthly totals if we have them, otherwise use the daily totals - self.totals = load_traffic('month', "thing", thing._fullname) - if not self.totals: - self.totals = load_traffic('day', "thing", thing._fullname) + # TODO: ditch uniques and just sum the hourly values + self.totals = load_traffic('day', "thing", thing._fullname) # generate a list of # (uniq impressions, # impressions, uniq clicks, # clicks) if self.totals: @@ -3301,7 +3299,7 @@ class PromotedTraffic(Traffic): uimp, nimp, ucli, ncli = data return (label, num(uimp), num(nimp), num(ucli), num(ncli), - ("%.2f%%" % (float(100*ucli) / uimp)) if nimp else "--.--%", + ("%.2f%%" % (float(100*ucli) / uimp)) if uimp else "--.--%", ("%.2f%%" % (float(100*ncli) / nimp)) if nimp else "--.--%") for date, data in self.traffic: diff --git a/r2/r2/lib/traffic.py b/r2/r2/lib/traffic.py index 929128b57..cc6a1b1e5 100644 --- a/r2/r2/lib/traffic.py +++ b/r2/r2/lib/traffic.py @@ -20,87 +20,73 @@ # Inc. All Rights Reserved. ############################################################################### -from httplib import HTTPConnection -from urlparse import urlparse -from cPickle import loads -from utils import query_string -import os, socket, time, datetime -from pylons import g -from r2.lib.memoize import memoize +import time +import datetime -def load_traffic_uncached(interval, what, iden, - start_time = None, stop_time = None, - npoints = None): - """ - Fetches pickled traffic from the traffic server and returns it as a list. - On connection failure (or no data) returns an empy list. - """ - from r2.lib import promote - def format_date(d): - if hasattr(d, "tzinfo"): - if d.tzinfo is None: - d = d.replace(tzinfo = g.tz) - else: - d = d.astimezone(g.tz) - return ":".join(map(str, d.timetuple()[:6])) - - traffic_url = os.path.join(g.traffic_url, interval, what, iden) - args = {} - if what == 'thing' and interval == 'hour': - if start_time: - if not isinstance(start_time, datetime.datetime): - start_time = datetime.datetime(*start_time.timetuple()[:3]) - start_time -= promote.timezone_offset - if stop_time: - if not isinstance(stop_time, datetime.datetime): - stop_time = datetime.datetime(*stop_time.timetuple()[:3]) - stop_time -= promote.timezone_offset - if start_time: - args['start_time'] = format_date(start_time) - - if stop_time: - args['stop_time'] = format_date(stop_time) - if npoints: - args['n'] = npoints - u = urlparse(traffic_url) - try: - conn = HTTPConnection(u.hostname, u.port) - conn.request("GET", u.path + query_string(args)) - res = conn.getresponse() - res = loads(res.read()) if res.status == 200 else [] - conn.close() - return res - except socket.error: - return [] +from r2.lib import promote +from r2.models import traffic -#@memoize("cached_traffic", time = 60) -def load_traffic(interval, what, iden = '', - start_time = None, stop_time = None, - npoints = None): - """ - interval = (hour, day, month) - - what = (reddit, lang, thing, promos) - - iden is the specific thing (reddit name, language name, thing - fullname) that one is seeking traffic for. - """ - res = load_traffic_uncached(interval, what, iden, - start_time = start_time, stop_time = stop_time, - npoints = npoints) - if res and isinstance(res[0][0], datetime.datetime): - dates, data = zip(*res) - if interval == 'hour': - # shift hourly totals into local time zone. - dates = [x.replace(tzinfo=None) - - datetime.timedelta(0, time.timezone) for x in dates] - else: - # we don't care about the hours - dates = [x.date() for x in dates] - res = zip(dates, data) +def force_datetime(dt): + if isinstance(dt, datetime.datetime): + return dt + elif isinstance(dt, datetime.date): + return datetime.datetime.combine(dt, datetime.time()) + else: + raise NotImplementedError() + + +def load_traffic(interval, what, iden="", + start_time=None, stop_time=None, + npoints=None): + if what == "reddit": + sr_traffic = traffic.PageviewsBySubreddit.history(interval, iden) + + # add in null values for cname stuff + res = [(t, v + (0, 0)) for (t, v) in sr_traffic] + + # day interval needs subscription numbers + if interval == "day": + subscriptions = traffic.SubscriptionsBySubreddit.history(interval, + iden) + res = traffic.zip_timeseries(res, subscriptions) + elif what == "total": + res = traffic.SitewidePageviews.history(interval) + elif what == "summary" and iden == "reddit" and interval == "month": + sr_traffic = traffic.PageviewsBySubreddit.top_last_month() + # add in null values for cname stuff + # return directly because this doesn't have a date parameter first + return [(t, v + (0, 0)) for (t, v) in sr_traffic] + elif what == "promos" and interval == "day": + pageviews = traffic.AdImpressionsByCodename.historical_totals(interval) + clicks = traffic.ClickthroughsByCodename.historical_totals(interval) + res = traffic.zip_timeseries(pageviews, clicks) + elif what == "thing" and interval == "hour" and start_time: + start_time = force_datetime(start_time) - promote.timezone_offset + stop_time = force_datetime(stop_time) - promote.timezone_offset + pageviews = traffic.AdImpressionsByCodename.promotion_history(iden, + start_time, + stop_time) + clicks = traffic.ClickthroughsByCodename.promotion_history(iden, + start_time, + stop_time) + res = traffic.zip_timeseries(pageviews, clicks) + elif what == "thing" and not start_time: + pageviews = traffic.AdImpressionsByCodename.history(interval, iden) + clicks = traffic.ClickthroughsByCodename.history(interval, iden) + res = traffic.zip_timeseries(pageviews, clicks) + else: + raise NotImplementedError() + + if interval == "hour": + # convert to local time + tzoffset = datetime.timedelta(0, time.timezone) + res = [(d - tzoffset, v) for d, v in res] + else: + res = [(d.date(), v) for d, v in res] + return res - + def load_summary(what, interval = "month", npoints = 50): return load_traffic(interval, "summary", what, npoints = npoints) diff --git a/r2/r2/lib/utils/_utils.pyx b/r2/r2/lib/utils/_utils.pyx index eef03ec2d..e00296dfd 100644 --- a/r2/r2/lib/utils/_utils.pyx +++ b/r2/r2/lib/utils/_utils.pyx @@ -126,6 +126,9 @@ def timefromnow(interval): from pylons import g return datetime.now(g.tz) + timeinterval_fromstr(interval) +def timedelta_by_name(interval): + return timeinterval_fromstr('1 ' + interval) + cdef dict timeintervald = dict(second = 1, minute = 60, hour = 60 * 60, diff --git a/r2/r2/models/traffic.py b/r2/r2/models/traffic.py index e89abf06e..89f7a4cf2 100644 --- a/r2/r2/models/traffic.py +++ b/r2/r2/models/traffic.py @@ -20,12 +20,19 @@ # Inc. All Rights Reserved. ############################################################################### +import datetime + from pylons import g from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.schema import Column from sqlalchemy.types import DateTime, Integer, String, BigInteger +from sqlalchemy.sql.expression import desc +from sqlalchemy.sql.functions import sum +from r2.lib.utils import timedelta_by_name +from r2.models.link import Link +from r2.lib.memoize import memoize engine = g.dbm.get_engine("traffic") @@ -33,6 +40,228 @@ Session = scoped_session(sessionmaker(bind=engine)) Base = declarative_base(bind=engine) +def memoize_traffic(**memoize_kwargs): + """Wrap the memoize decorator and automatically determine memoize key. + + The memoize key is based off the full name (including class name) of the + method being memoized. + + """ + def memoize_traffic_decorator(fn): + def memoize_traffic_wrapper(cls, *args, **kwargs): + method = ".".join((cls.__name__, fn.__name__)) + actual_memoize_decorator = memoize(method, **memoize_kwargs) + actual_memoize_wrapper = actual_memoize_decorator(fn) + return actual_memoize_wrapper(cls, *args, **kwargs) + return memoize_traffic_wrapper + return memoize_traffic_decorator + + +class PeekableIterator(object): + """Iterator that supports peeking at the next item in the iterable.""" + + def __init__(self, iterable): + self.iterator = iter(iterable) + self.item = None + + def peek(self): + """Get the next item in the iterable without advancing our position.""" + if not self.item: + try: + self.item = self.iterator.next() + except StopIteration: + return None + return self.item + + def next(self): + """Get the next item in the iterable and advance our position.""" + item = self.peek() + self.item = None + return item + + +def zip_timeseries(*series): + """Zip timeseries data while gracefully handling gaps in the data. + + Timeseries data is expected to be a sequence of two-tuples (date, values). + Values is expected itself to be a tuple. The width of the values tuples + should be the same across all elements in a timeseries sequence. The result + will be a single sequence in timeseries format. + + Gaps in sequences are filled with an appropriate number of zeros based on + the size of the first value-tuple of that sequence. + + """ + + iterators = [PeekableIterator(s) for s in series] + widths = [len(w.peek()) for w in iterators] + + while True: + items = [it.peek() for it in iterators] + if not any(items): + return + + current_slice = min(item[0] for item in items if item) + + data = [] + for i, item in enumerate(items): + # each item is (date, data) + if item and item[0] == current_slice: + data.extend(item[1]) + iterators[i].next() + else: + data.extend([0] * widths[i]) + + yield current_slice, tuple(data) + + +def decrement_month(date, amount=1): + """Given a truncated datetime, return a new one one month in the past.""" + + if date.day != 1: + raise ValueError("Input must be truncated to the 1st of the month.") + + date -= datetime.timedelta(days=1) + return date.replace(day=1) + + +def fill_gaps_generator(interval, start_time, stop_time, query, *columns): + """Generate a timeseries sequence with a value for every sample expected. + + Iterate backwards in steps specified by interval from the most recent date + (stop_time) to the oldest (start_time) and pull the columns listed out of + query. If the query doesn't have data for a time slice, fill the gap with + an appropriate number of zeroes. + + """ + + iterator = PeekableIterator(query) + step = timedelta_by_name(interval) + current_slice = stop_time + + while current_slice > start_time: + row = iterator.peek() + + if row and row.date == current_slice: + yield current_slice, tuple(getattr(row, c) for c in columns) + iterator.next() + else: + yield current_slice, tuple(0 for c in columns) + + # moving backwards a month isn't a fixed timedelta -- special case it + if interval != "month": + current_slice -= step + else: + current_slice = decrement_month(current_slice) + + +def fill_gaps(*args, **kwargs): + """Listify the generator returned by fill_gaps_generator for `memoize`.""" + generator = fill_gaps_generator(*args, **kwargs) + return list(generator) + + +time_range_by_interval = dict(hour=datetime.timedelta(days=4), + day=datetime.timedelta(weeks=8), + month=datetime.timedelta(weeks=52)) + + +def time_range(interval): + """Calculate the time range to view for a given level of precision. + + The coarser our granularity, the more history we'll want to see. + + """ + + # the stop time is the most recent slice-time; get this by truncating + # the appropriate amount from the current time + stop_time = datetime.datetime.utcnow() + stop_time = stop_time.replace(minute=0, second=0, microsecond=0) + if interval in ("day", "month"): + stop_time = stop_time.replace(hour=0) + if interval == "month": + stop_time = stop_time.replace(day=1) + + # then the start time is easy to work out + range = time_range_by_interval[interval] + start_time = stop_time - range + + return start_time, stop_time + + +def make_history_query(cls, interval): + """Build a generic query showing the history of a given aggregate.""" + + start_time, stop_time = time_range(interval) + q = (Session.query(cls) + .filter(cls.date >= start_time)) + + # subscription stats doesn't have an interval (it's only daily) + if hasattr(cls, "interval"): + q = q.filter(cls.interval == interval) + + q = q.order_by(desc(cls.date)) + + return start_time, stop_time, q + + +def top_last_month(cls, key): + """Aggregate a listing of the top items (by uniques) last month. + + We use the last month because it's guaranteed to be fully computed and + therefore will be more meaningful. + + """ + cur_month = datetime.date.today().replace(day=1) + last_month = decrement_month(cur_month) + + q = (Session.query(cls) + .filter(cls.date == last_month) + .filter(cls.interval == "month") + .order_by(desc(cls.date), desc(cls.unique_count)) + .limit(55)) + + return [(getattr(r, key), (r.unique_count, r.pageview_count)) + for r in q.all()] + + +def totals(cls, interval): + """Aggregate sitewide totals for self-serve promotion traffic. + + We only aggregate codenames that start with a link type prefix which + effectively filters out all DART / 300x100 etc. traffic numbers. + + """ + start_time, stop_time = time_range(interval) + q = (Session.query(cls.date, sum(cls.pageview_count).label("sum")) + .filter(cls.interval == interval) + .filter(cls.date > start_time) + .filter(cls.codename.startswith(Link._type_prefix)) + .group_by(cls.date) + .order_by(desc(cls.date))) + return fill_gaps(interval, start_time, stop_time, q, "sum") + + +def promotion_history(cls, codename, start, stop): + """Get hourly traffic for a self-serve promotion across all campaigns.""" + q = (Session.query(cls) + .filter(cls.interval == "hour") + .filter(cls.codename == codename) + .filter(cls.date >= start) + .filter(cls.date <= stop) + .order_by(cls.date)) + return [(r.date, (r.unique_count, r.pageview_count)) for r in q.all()] + + +@memoize("traffic_last_modified", time=60 * 10) +def get_traffic_last_modified(): + """Guess how far behind the traffic processing system is.""" + return (Session.query(SitewidePageviews.date) + .order_by(desc(SitewidePageviews.date)) + .limit(1) + .one()).date + + class SitewidePageviews(Base): __tablename__ = "traffic_aggregate" @@ -41,6 +270,13 @@ class SitewidePageviews(Base): unique_count = Column("unique", Integer()) pageview_count = Column("total", BigInteger()) + @classmethod + @memoize_traffic(time=3600) + def history(cls, interval): + start_time, stop_time, q = make_history_query(cls, interval) + return fill_gaps(interval, start_time, stop_time, q, + "unique_count", "pageview_count") + class PageviewsBySubreddit(Base): __tablename__ = "traffic_subreddits" @@ -51,6 +287,19 @@ class PageviewsBySubreddit(Base): unique_count = Column("unique", Integer()) pageview_count = Column("total", Integer()) + @classmethod + @memoize_traffic(time=3600) + def history(cls, interval, subreddit): + start_time, stop_time, q = make_history_query(cls, interval) + q = q.filter(cls.subreddit == subreddit) + return fill_gaps(interval, start_time, stop_time, q, + "unique_count", "pageview_count") + + @classmethod + @memoize_traffic(time=3600 * 6) + def top_last_month(cls): + return top_last_month(cls, "subreddit") + class PageviewsBySubredditAndPath(Base): __tablename__ = "traffic_srpaths" @@ -71,6 +320,19 @@ class PageviewsByLanguage(Base): unique_count = Column("unique", Integer()) pageview_count = Column("total", Integer()) + @classmethod + @memoize_traffic(time=3600) + def history(cls, interval, lang): + start_time, stop_time, q = make_history_query(cls, interval) + q = q.filter(cls.lang == lang) + return fill_gaps(interval, start_time, stop_time, q, + "unique_count", "pageview_count") + + @classmethod + @memoize_traffic(time=3600 * 6) + def top_last_month(cls): + return top_last_month(cls, "lang") + class ClickthroughsByCodename(Base): __tablename__ = "traffic_click" @@ -81,6 +343,24 @@ class ClickthroughsByCodename(Base): unique_count = Column("unique", Integer()) pageview_count = Column("total", Integer()) + @classmethod + @memoize_traffic(time=3600) + def history(cls, interval, codename): + start_time, stop_time, q = make_history_query(cls, interval) + q = q.filter(cls.codename == codename) + return fill_gaps(interval, start_time, stop_time, q, "unique_count", + "pageview_count") + + @classmethod + @memoize_traffic(time=3600) + def promotion_history(cls, codename, start, stop): + return promotion_history(cls, codename, start, stop) + + @classmethod + @memoize_traffic(time=3600) + def historical_totals(cls, interval): + return totals(cls, interval) + class TargetedClickthroughsByCodename(Base): __tablename__ = "traffic_clicktarget" @@ -102,6 +382,29 @@ class AdImpressionsByCodename(Base): unique_count = Column("unique", Integer()) pageview_count = Column("total", Integer()) + @classmethod + @memoize_traffic(time=3600) + def history(cls, interval, codename): + start_time, stop_time, q = make_history_query(cls, interval) + q = q.filter(cls.codename == codename) + return fill_gaps(interval, start_time, stop_time, q, + "unique_count", "pageview_count") + + @classmethod + @memoize_traffic(time=3600) + def promotion_history(cls, codename, start, stop): + return promotion_history(cls, codename, start, stop) + + @classmethod + @memoize_traffic(time=3600) + def historical_totals(cls, interval): + return totals(cls, interval) + + @classmethod + @memoize_traffic(time=3600) + def top_last_month(cls): + return top_last_month(cls, "codename") + class TargetedImpressionsByCodename(Base): __tablename__ = "traffic_thingtarget" @@ -121,6 +424,13 @@ class SubscriptionsBySubreddit(Base): date = Column(DateTime(), nullable=False, primary_key=True) subscriber_count = Column("unique", Integer()) + @classmethod + @memoize_traffic(time=3600 * 6) + def history(cls, interval, subreddit): + start_time, stop_time, q = make_history_query(cls, interval) + q = q.filter(cls.subreddit == subreddit) + return fill_gaps(interval, start_time, stop_time, q, + "subscriber_count") # create the tables if they don't exist if g.db_create_tables: diff --git a/r2/r2/templates/promotedtraffic.html b/r2/r2/templates/promotedtraffic.html index 5aa296d16..1c48f2bcf 100644 --- a/r2/r2/templates/promotedtraffic.html +++ b/r2/r2/templates/promotedtraffic.html @@ -101,7 +101,7 @@ ${thing.viewers} ${num(ucli)}${'*' if thing.preliminary else ''} ${num(ncli)} - ${("%.2f%%" % (float(100*ucli) / uimp)) if nimp else "--.--%"} + ${("%.2f%%" % (float(100*ucli) / uimp)) if uimp else "--.--%"} ${("%.2f%%" % (float(100*ncli) / nimp)) if nimp else "--.--%"} %if thing.preliminary: diff --git a/r2/r2/templates/reddittraffic.html b/r2/r2/templates/reddittraffic.html index 48ae04db0..53adf8a19 100644 --- a/r2/r2/templates/reddittraffic.html +++ b/r2/r2/templates/reddittraffic.html @@ -62,7 +62,7 @@ ${unsafe(js.use('flot'))} %endif - %for x, (date, data) in enumerate(reversed(thing.day_data)): + %for x, (date, data) in enumerate(thing.day_data): ${date.strftime("%Y-%m-%d")} <%