mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-01-23 05:48:11 -05:00
* Add a new Cassandra CF for comment-sort caching * Add JSONP support * Support non-auto-renewing PayPal IPNs * Gold accounting * Gold features: comments tracking, larger comment limit * Autorenewing gold * Google checkout support * Profile-page sorting for all * Title-text on the logo * Hardcache sharding and profiling * Self serve enhancements * Add /r/foo/faq * Make self-centred reddits allow for more verbose selftext * Much better down-handling of databases * Add the ability to take a thread-dump from a running process * Remove the comscore tracker * Add comments to modqueue (does not back-populate)
111 lines
3.2 KiB
Python
111 lines
3.2 KiB
Python
#! /usr/bin/python
|
|
|
|
from r2.lib.utils import trunc_time
|
|
from r2.lib import amqp
|
|
from r2.lib.log import log_text
|
|
from pylons import g
|
|
from datetime import datetime
|
|
from time import sleep
|
|
import random as rand
|
|
|
|
import pickle
|
|
|
|
q = 'usage_q'
|
|
tz = g.display_tz
|
|
|
|
def check_dict(body):
|
|
d = pickle.loads(body)
|
|
|
|
for k in ("start_time", "end_time", "action"):
|
|
if k not in d:
|
|
raise TypeError
|
|
|
|
return d
|
|
|
|
def hund_from_start_and_end(start_time, end_time):
|
|
elapsed = end_time - start_time
|
|
|
|
hund_sec = int(elapsed.seconds * 100 +
|
|
elapsed.microseconds / 10000)
|
|
|
|
if hund_sec == 0:
|
|
fraction = elapsed.microseconds / 10000.0
|
|
if rand.random() < fraction:
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
return hund_sec
|
|
|
|
def buckets(time):
|
|
time = time.astimezone(tz)
|
|
|
|
# Keep:
|
|
# Daily buckets for eight days
|
|
# 1-hour buckets for 23 hours
|
|
# 5-min buckets for two hours
|
|
#
|
|
# (If the 1-hour bucket lasts more than a day, things can get confusing;
|
|
# at 12:30, the 12:xx column will have things from today at 12:20 and
|
|
# from yesterday at 12:40. This could be worked around, but the code
|
|
# over in pages.py is convoluted enough, so I'd rather not.)
|
|
|
|
return [
|
|
(86400 * 8, time.strftime("%Y/%m/%d_xx:xx")),
|
|
( 3600 * 23, time.strftime("%Y/%m/%d_%H:xx")),
|
|
( 3600 * 2, trunc_time(time, 5).strftime("%Y/%m/%d_%H:%M")),
|
|
]
|
|
|
|
def run(limit=1000, verbose=False):
|
|
def myfunc(msgs, chan):
|
|
if verbose:
|
|
print "processing a batch"
|
|
|
|
incrs = {}
|
|
|
|
for msg in msgs:
|
|
try:
|
|
d = check_dict(msg.body)
|
|
except TypeError:
|
|
log_text("usage_q error", "wtf is %r" % msg.body, "error")
|
|
continue
|
|
|
|
hund_sec = hund_from_start_and_end(d["start_time"], d["end_time"])
|
|
|
|
action = d["action"].replace("-", "_")
|
|
|
|
fudged_count = int( 1 / d["sampling_rate"])
|
|
fudged_elapsed = int(hund_sec / d["sampling_rate"])
|
|
|
|
for exp_time, bucket in buckets(d["end_time"]):
|
|
k = "%s-%s" % (bucket, action)
|
|
incrs.setdefault(k, [0, 0, exp_time])
|
|
incrs[k][0] += fudged_count
|
|
incrs[k][1] += fudged_elapsed
|
|
|
|
for k, (count, elapsed, exp_time) in incrs.iteritems():
|
|
c_key = "profile_count-" + k
|
|
e_key = "profile_elapsed-" + k
|
|
|
|
if verbose:
|
|
c_old = g.hardcache.get(c_key)
|
|
e_old = g.hardcache.get(e_key)
|
|
|
|
g.hardcache.accrue(c_key, delta=count, time=exp_time)
|
|
g.hardcache.accrue(e_key, delta=elapsed, time=exp_time)
|
|
|
|
if verbose:
|
|
c_new = g.hardcache.get(c_key)
|
|
e_new = g.hardcache.get(e_key)
|
|
|
|
print "%s: %s -> %s" % (c_key, c_old, c_new)
|
|
print "%s: %s -> %s" % (e_key, e_old, e_new)
|
|
|
|
if len(msgs) < limit / 2:
|
|
if verbose:
|
|
print "Sleeping..."
|
|
sleep (10)
|
|
amqp.handle_items(q, myfunc, limit=limit, drain=False, verbose=verbose,
|
|
sleep_time = 30)
|
|
|