Directly access traffic database for traffic queries.

Traffic stats are currently pulled from the traffic app, which is a
separate pylons application, via HTTP requests. This patch eliminates
the dependency on that extra app by moving the queries into the reddit
app while maintaining the existing API near-verbatim. This allows us to
ditch the external traffic app.
This commit is contained in:
Neil Williams
2012-06-20 10:58:31 -07:00
parent 49f688df10
commit 6a2c4afcb2
6 changed files with 381 additions and 84 deletions

View File

@@ -3256,10 +3256,8 @@ class PromotedTraffic(Traffic):
self.preliminary = (until + datetime.timedelta(1) > now)
self.traffic = load_traffic('hour', "thing", thing._fullname,
start_time = d, stop_time = until)
# load monthly totals if we have them, otherwise use the daily totals
self.totals = load_traffic('month', "thing", thing._fullname)
if not self.totals:
self.totals = load_traffic('day', "thing", thing._fullname)
# TODO: ditch uniques and just sum the hourly values
self.totals = load_traffic('day', "thing", thing._fullname)
# generate a list of
# (uniq impressions, # impressions, uniq clicks, # clicks)
if self.totals:
@@ -3301,7 +3299,7 @@ class PromotedTraffic(Traffic):
uimp, nimp, ucli, ncli = data
return (label,
num(uimp), num(nimp), num(ucli), num(ncli),
("%.2f%%" % (float(100*ucli) / uimp)) if nimp else "--.--%",
("%.2f%%" % (float(100*ucli) / uimp)) if uimp else "--.--%",
("%.2f%%" % (float(100*ncli) / nimp)) if nimp else "--.--%")
for date, data in self.traffic:

View File

@@ -20,87 +20,73 @@
# Inc. All Rights Reserved.
###############################################################################
from httplib import HTTPConnection
from urlparse import urlparse
from cPickle import loads
from utils import query_string
import os, socket, time, datetime
from pylons import g
from r2.lib.memoize import memoize
import time
import datetime
def load_traffic_uncached(interval, what, iden,
start_time = None, stop_time = None,
npoints = None):
"""
Fetches pickled traffic from the traffic server and returns it as a list.
On connection failure (or no data) returns an empy list.
"""
from r2.lib import promote
def format_date(d):
if hasattr(d, "tzinfo"):
if d.tzinfo is None:
d = d.replace(tzinfo = g.tz)
else:
d = d.astimezone(g.tz)
return ":".join(map(str, d.timetuple()[:6]))
traffic_url = os.path.join(g.traffic_url, interval, what, iden)
args = {}
if what == 'thing' and interval == 'hour':
if start_time:
if not isinstance(start_time, datetime.datetime):
start_time = datetime.datetime(*start_time.timetuple()[:3])
start_time -= promote.timezone_offset
if stop_time:
if not isinstance(stop_time, datetime.datetime):
stop_time = datetime.datetime(*stop_time.timetuple()[:3])
stop_time -= promote.timezone_offset
if start_time:
args['start_time'] = format_date(start_time)
if stop_time:
args['stop_time'] = format_date(stop_time)
if npoints:
args['n'] = npoints
u = urlparse(traffic_url)
try:
conn = HTTPConnection(u.hostname, u.port)
conn.request("GET", u.path + query_string(args))
res = conn.getresponse()
res = loads(res.read()) if res.status == 200 else []
conn.close()
return res
except socket.error:
return []
from r2.lib import promote
from r2.models import traffic
#@memoize("cached_traffic", time = 60)
def load_traffic(interval, what, iden = '',
start_time = None, stop_time = None,
npoints = None):
"""
interval = (hour, day, month)
what = (reddit, lang, thing, promos)
iden is the specific thing (reddit name, language name, thing
fullname) that one is seeking traffic for.
"""
res = load_traffic_uncached(interval, what, iden,
start_time = start_time, stop_time = stop_time,
npoints = npoints)
if res and isinstance(res[0][0], datetime.datetime):
dates, data = zip(*res)
if interval == 'hour':
# shift hourly totals into local time zone.
dates = [x.replace(tzinfo=None) -
datetime.timedelta(0, time.timezone) for x in dates]
else:
# we don't care about the hours
dates = [x.date() for x in dates]
res = zip(dates, data)
def force_datetime(dt):
if isinstance(dt, datetime.datetime):
return dt
elif isinstance(dt, datetime.date):
return datetime.datetime.combine(dt, datetime.time())
else:
raise NotImplementedError()
def load_traffic(interval, what, iden="",
start_time=None, stop_time=None,
npoints=None):
if what == "reddit":
sr_traffic = traffic.PageviewsBySubreddit.history(interval, iden)
# add in null values for cname stuff
res = [(t, v + (0, 0)) for (t, v) in sr_traffic]
# day interval needs subscription numbers
if interval == "day":
subscriptions = traffic.SubscriptionsBySubreddit.history(interval,
iden)
res = traffic.zip_timeseries(res, subscriptions)
elif what == "total":
res = traffic.SitewidePageviews.history(interval)
elif what == "summary" and iden == "reddit" and interval == "month":
sr_traffic = traffic.PageviewsBySubreddit.top_last_month()
# add in null values for cname stuff
# return directly because this doesn't have a date parameter first
return [(t, v + (0, 0)) for (t, v) in sr_traffic]
elif what == "promos" and interval == "day":
pageviews = traffic.AdImpressionsByCodename.historical_totals(interval)
clicks = traffic.ClickthroughsByCodename.historical_totals(interval)
res = traffic.zip_timeseries(pageviews, clicks)
elif what == "thing" and interval == "hour" and start_time:
start_time = force_datetime(start_time) - promote.timezone_offset
stop_time = force_datetime(stop_time) - promote.timezone_offset
pageviews = traffic.AdImpressionsByCodename.promotion_history(iden,
start_time,
stop_time)
clicks = traffic.ClickthroughsByCodename.promotion_history(iden,
start_time,
stop_time)
res = traffic.zip_timeseries(pageviews, clicks)
elif what == "thing" and not start_time:
pageviews = traffic.AdImpressionsByCodename.history(interval, iden)
clicks = traffic.ClickthroughsByCodename.history(interval, iden)
res = traffic.zip_timeseries(pageviews, clicks)
else:
raise NotImplementedError()
if interval == "hour":
# convert to local time
tzoffset = datetime.timedelta(0, time.timezone)
res = [(d - tzoffset, v) for d, v in res]
else:
res = [(d.date(), v) for d, v in res]
return res
def load_summary(what, interval = "month", npoints = 50):
return load_traffic(interval, "summary", what, npoints = npoints)

View File

@@ -126,6 +126,9 @@ def timefromnow(interval):
from pylons import g
return datetime.now(g.tz) + timeinterval_fromstr(interval)
def timedelta_by_name(interval):
return timeinterval_fromstr('1 ' + interval)
cdef dict timeintervald = dict(second = 1,
minute = 60,
hour = 60 * 60,

View File

@@ -20,12 +20,19 @@
# Inc. All Rights Reserved.
###############################################################################
import datetime
from pylons import g
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.schema import Column
from sqlalchemy.types import DateTime, Integer, String, BigInteger
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.functions import sum
from r2.lib.utils import timedelta_by_name
from r2.models.link import Link
from r2.lib.memoize import memoize
engine = g.dbm.get_engine("traffic")
@@ -33,6 +40,228 @@ Session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base(bind=engine)
def memoize_traffic(**memoize_kwargs):
"""Wrap the memoize decorator and automatically determine memoize key.
The memoize key is based off the full name (including class name) of the
method being memoized.
"""
def memoize_traffic_decorator(fn):
def memoize_traffic_wrapper(cls, *args, **kwargs):
method = ".".join((cls.__name__, fn.__name__))
actual_memoize_decorator = memoize(method, **memoize_kwargs)
actual_memoize_wrapper = actual_memoize_decorator(fn)
return actual_memoize_wrapper(cls, *args, **kwargs)
return memoize_traffic_wrapper
return memoize_traffic_decorator
class PeekableIterator(object):
"""Iterator that supports peeking at the next item in the iterable."""
def __init__(self, iterable):
self.iterator = iter(iterable)
self.item = None
def peek(self):
"""Get the next item in the iterable without advancing our position."""
if not self.item:
try:
self.item = self.iterator.next()
except StopIteration:
return None
return self.item
def next(self):
"""Get the next item in the iterable and advance our position."""
item = self.peek()
self.item = None
return item
def zip_timeseries(*series):
"""Zip timeseries data while gracefully handling gaps in the data.
Timeseries data is expected to be a sequence of two-tuples (date, values).
Values is expected itself to be a tuple. The width of the values tuples
should be the same across all elements in a timeseries sequence. The result
will be a single sequence in timeseries format.
Gaps in sequences are filled with an appropriate number of zeros based on
the size of the first value-tuple of that sequence.
"""
iterators = [PeekableIterator(s) for s in series]
widths = [len(w.peek()) for w in iterators]
while True:
items = [it.peek() for it in iterators]
if not any(items):
return
current_slice = min(item[0] for item in items if item)
data = []
for i, item in enumerate(items):
# each item is (date, data)
if item and item[0] == current_slice:
data.extend(item[1])
iterators[i].next()
else:
data.extend([0] * widths[i])
yield current_slice, tuple(data)
def decrement_month(date, amount=1):
"""Given a truncated datetime, return a new one one month in the past."""
if date.day != 1:
raise ValueError("Input must be truncated to the 1st of the month.")
date -= datetime.timedelta(days=1)
return date.replace(day=1)
def fill_gaps_generator(interval, start_time, stop_time, query, *columns):
"""Generate a timeseries sequence with a value for every sample expected.
Iterate backwards in steps specified by interval from the most recent date
(stop_time) to the oldest (start_time) and pull the columns listed out of
query. If the query doesn't have data for a time slice, fill the gap with
an appropriate number of zeroes.
"""
iterator = PeekableIterator(query)
step = timedelta_by_name(interval)
current_slice = stop_time
while current_slice > start_time:
row = iterator.peek()
if row and row.date == current_slice:
yield current_slice, tuple(getattr(row, c) for c in columns)
iterator.next()
else:
yield current_slice, tuple(0 for c in columns)
# moving backwards a month isn't a fixed timedelta -- special case it
if interval != "month":
current_slice -= step
else:
current_slice = decrement_month(current_slice)
def fill_gaps(*args, **kwargs):
"""Listify the generator returned by fill_gaps_generator for `memoize`."""
generator = fill_gaps_generator(*args, **kwargs)
return list(generator)
time_range_by_interval = dict(hour=datetime.timedelta(days=4),
day=datetime.timedelta(weeks=8),
month=datetime.timedelta(weeks=52))
def time_range(interval):
"""Calculate the time range to view for a given level of precision.
The coarser our granularity, the more history we'll want to see.
"""
# the stop time is the most recent slice-time; get this by truncating
# the appropriate amount from the current time
stop_time = datetime.datetime.utcnow()
stop_time = stop_time.replace(minute=0, second=0, microsecond=0)
if interval in ("day", "month"):
stop_time = stop_time.replace(hour=0)
if interval == "month":
stop_time = stop_time.replace(day=1)
# then the start time is easy to work out
range = time_range_by_interval[interval]
start_time = stop_time - range
return start_time, stop_time
def make_history_query(cls, interval):
"""Build a generic query showing the history of a given aggregate."""
start_time, stop_time = time_range(interval)
q = (Session.query(cls)
.filter(cls.date >= start_time))
# subscription stats doesn't have an interval (it's only daily)
if hasattr(cls, "interval"):
q = q.filter(cls.interval == interval)
q = q.order_by(desc(cls.date))
return start_time, stop_time, q
def top_last_month(cls, key):
"""Aggregate a listing of the top items (by uniques) last month.
We use the last month because it's guaranteed to be fully computed and
therefore will be more meaningful.
"""
cur_month = datetime.date.today().replace(day=1)
last_month = decrement_month(cur_month)
q = (Session.query(cls)
.filter(cls.date == last_month)
.filter(cls.interval == "month")
.order_by(desc(cls.date), desc(cls.unique_count))
.limit(55))
return [(getattr(r, key), (r.unique_count, r.pageview_count))
for r in q.all()]
def totals(cls, interval):
"""Aggregate sitewide totals for self-serve promotion traffic.
We only aggregate codenames that start with a link type prefix which
effectively filters out all DART / 300x100 etc. traffic numbers.
"""
start_time, stop_time = time_range(interval)
q = (Session.query(cls.date, sum(cls.pageview_count).label("sum"))
.filter(cls.interval == interval)
.filter(cls.date > start_time)
.filter(cls.codename.startswith(Link._type_prefix))
.group_by(cls.date)
.order_by(desc(cls.date)))
return fill_gaps(interval, start_time, stop_time, q, "sum")
def promotion_history(cls, codename, start, stop):
"""Get hourly traffic for a self-serve promotion across all campaigns."""
q = (Session.query(cls)
.filter(cls.interval == "hour")
.filter(cls.codename == codename)
.filter(cls.date >= start)
.filter(cls.date <= stop)
.order_by(cls.date))
return [(r.date, (r.unique_count, r.pageview_count)) for r in q.all()]
@memoize("traffic_last_modified", time=60 * 10)
def get_traffic_last_modified():
"""Guess how far behind the traffic processing system is."""
return (Session.query(SitewidePageviews.date)
.order_by(desc(SitewidePageviews.date))
.limit(1)
.one()).date
class SitewidePageviews(Base):
__tablename__ = "traffic_aggregate"
@@ -41,6 +270,13 @@ class SitewidePageviews(Base):
unique_count = Column("unique", Integer())
pageview_count = Column("total", BigInteger())
@classmethod
@memoize_traffic(time=3600)
def history(cls, interval):
start_time, stop_time, q = make_history_query(cls, interval)
return fill_gaps(interval, start_time, stop_time, q,
"unique_count", "pageview_count")
class PageviewsBySubreddit(Base):
__tablename__ = "traffic_subreddits"
@@ -51,6 +287,19 @@ class PageviewsBySubreddit(Base):
unique_count = Column("unique", Integer())
pageview_count = Column("total", Integer())
@classmethod
@memoize_traffic(time=3600)
def history(cls, interval, subreddit):
start_time, stop_time, q = make_history_query(cls, interval)
q = q.filter(cls.subreddit == subreddit)
return fill_gaps(interval, start_time, stop_time, q,
"unique_count", "pageview_count")
@classmethod
@memoize_traffic(time=3600 * 6)
def top_last_month(cls):
return top_last_month(cls, "subreddit")
class PageviewsBySubredditAndPath(Base):
__tablename__ = "traffic_srpaths"
@@ -71,6 +320,19 @@ class PageviewsByLanguage(Base):
unique_count = Column("unique", Integer())
pageview_count = Column("total", Integer())
@classmethod
@memoize_traffic(time=3600)
def history(cls, interval, lang):
start_time, stop_time, q = make_history_query(cls, interval)
q = q.filter(cls.lang == lang)
return fill_gaps(interval, start_time, stop_time, q,
"unique_count", "pageview_count")
@classmethod
@memoize_traffic(time=3600 * 6)
def top_last_month(cls):
return top_last_month(cls, "lang")
class ClickthroughsByCodename(Base):
__tablename__ = "traffic_click"
@@ -81,6 +343,24 @@ class ClickthroughsByCodename(Base):
unique_count = Column("unique", Integer())
pageview_count = Column("total", Integer())
@classmethod
@memoize_traffic(time=3600)
def history(cls, interval, codename):
start_time, stop_time, q = make_history_query(cls, interval)
q = q.filter(cls.codename == codename)
return fill_gaps(interval, start_time, stop_time, q, "unique_count",
"pageview_count")
@classmethod
@memoize_traffic(time=3600)
def promotion_history(cls, codename, start, stop):
return promotion_history(cls, codename, start, stop)
@classmethod
@memoize_traffic(time=3600)
def historical_totals(cls, interval):
return totals(cls, interval)
class TargetedClickthroughsByCodename(Base):
__tablename__ = "traffic_clicktarget"
@@ -102,6 +382,29 @@ class AdImpressionsByCodename(Base):
unique_count = Column("unique", Integer())
pageview_count = Column("total", Integer())
@classmethod
@memoize_traffic(time=3600)
def history(cls, interval, codename):
start_time, stop_time, q = make_history_query(cls, interval)
q = q.filter(cls.codename == codename)
return fill_gaps(interval, start_time, stop_time, q,
"unique_count", "pageview_count")
@classmethod
@memoize_traffic(time=3600)
def promotion_history(cls, codename, start, stop):
return promotion_history(cls, codename, start, stop)
@classmethod
@memoize_traffic(time=3600)
def historical_totals(cls, interval):
return totals(cls, interval)
@classmethod
@memoize_traffic(time=3600)
def top_last_month(cls):
return top_last_month(cls, "codename")
class TargetedImpressionsByCodename(Base):
__tablename__ = "traffic_thingtarget"
@@ -121,6 +424,13 @@ class SubscriptionsBySubreddit(Base):
date = Column(DateTime(), nullable=False, primary_key=True)
subscriber_count = Column("unique", Integer())
@classmethod
@memoize_traffic(time=3600 * 6)
def history(cls, interval, subreddit):
start_time, stop_time, q = make_history_query(cls, interval)
q = q.filter(cls.subreddit == subreddit)
return fill_gaps(interval, start_time, stop_time, q,
"subscriber_count")
# create the tables if they don't exist
if g.db_create_tables:

View File

@@ -101,7 +101,7 @@ ${thing.viewers}
${num(ucli)}${'*' if thing.preliminary else ''}
</td>
<td>${num(ncli)}</td>
<td>${("%.2f%%" % (float(100*ucli) / uimp)) if nimp else "--.--%"}</td>
<td>${("%.2f%%" % (float(100*ucli) / uimp)) if uimp else "--.--%"}</td>
<td>${("%.2f%%" % (float(100*ncli) / nimp)) if nimp else "--.--%"}</td>
</tr>
%if thing.preliminary:

View File

@@ -62,7 +62,7 @@ ${unsafe(js.use('flot'))}
</tr>
%endif
</tr>
%for x, (date, data) in enumerate(reversed(thing.day_data)):
%for x, (date, data) in enumerate(thing.day_data):
<tr class="${'odd' if x % 2 else 'even'} ${'max' if data[0] == umax else 'min' if data[0] == umin else ''}">
<td>${date.strftime("%Y-%m-%d")}</td>
<%