From 7e9f9fb4888d3c93f35ec523dd9fd6fb415d08de Mon Sep 17 00:00:00 2001 From: Keith Mitchell Date: Wed, 24 Oct 2012 14:16:12 -0700 Subject: [PATCH] Fix issues with use of amqp for SponsorBoxWeights --- r2/r2/lib/promote.py | 67 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/r2/r2/lib/promote.py b/r2/r2/lib/promote.py index f81e44e4b..ae7312168 100644 --- a/r2/r2/lib/promote.py +++ b/r2/r2/lib/promote.py @@ -475,7 +475,7 @@ def reject_promotion(link, reason = None): # while we're doing work here, it will correctly exclude it set_promote_status(link, PROMOTE_STATUS.rejected) - links, = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0] + links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0] if link._fullname in links: PromotionLog.add(link, 'Marked promotion for rejection') queue_changed_promo(link, "rejected") @@ -692,17 +692,17 @@ def get_live_promotions(srids, from_permacache=True): return links, weights -def set_live_promotions(links, weights, which=("cass", "permacache")): - if "cass" in which: - timer = g.stats.get_timer("promote.set_live.cass") - timer.start() - SponsorBoxWeightings.set_from_weights(weights) - timer.stop() +def set_live_promotions(links, weights, which=("permacache",)): if "permacache" in which: timer = g.stats.get_timer("promote.set_live.permacache") timer.start() g.permacache.set(promotion_key(), (links, weights)) timer.stop() + if "cass" in which: + timer = g.stats.get_timer("promote.set_live.cass") + timer.start() + SponsorBoxWeightings.set_from_weights(weights) + timer.stop() # Gotcha: even if links are scheduled and authorized, they won't be added to # current promotions until they're actually charged, so make sure to call @@ -930,18 +930,61 @@ def get_total_run(link): return earliest, latest -def Run(offset = 0): +class PromotionLog(tdb_cassandra.View): + _use_db = True + _connection_pool = 'main' + _compare_with = TIME_UUID_TYPE + + @classmethod + def _rowkey(cls, link): + return link._fullname + + @classmethod + def add(cls, link, text): + name = c.user.name if c.user_is_loggedin else "" + now = datetime.now(g.tz).strftime("%Y-%m-%d %H:%M:%S") + text = "[%s: %s] %s" % (name, now, text) + rowkey = cls._rowkey(link) + column = {uuid1(): filters._force_utf8(text)} + cls._set_values(rowkey, column) + + # Dual write to old promotion_log attribute + log = list(getattr(link, "promotion_log", [])) + log.append(text) + link.promotion_log = map(filters._force_utf8, log) + link._commit() + return text + + @classmethod + def get(cls, link): + rowkey = cls._rowkey(link) + try: + row = cls._byID(rowkey) + except tdb_cassandra.NotFound: + return [] + tuples = sorted(row._values().items(), key=lambda t: t[0].time) + return [t[1] for t in tuples] + + +def Run(offset=0, verbose=True): """reddit-job-update_promos: Intended to be run hourly to pull in scheduled changes to ads """ + if verbose: + print "promote.py:Run() - charge_pending()" charge_pending(offset = offset + 1) charge_pending(offset = offset) + if verbose: + print "promote.py:Run() - amqp.add_item()" amqp.add_item(UPDATE_QUEUE, json.dumps(QUEUE_ALL), delivery_mode=amqp.DELIVERY_TRANSIENT) + amqp.worker.join() + if verbose: + print "promote.py:Run() - finished" -def run_changed(drain=False, limit=100, sleep_time=10, verbose=False): +def run_changed(drain=False, limit=100, sleep_time=10, verbose=True): """reddit-consumer-update_promos: amqp consumer of update_promos_q Handles asynch accepting/rejecting of ads that are scheduled to be live @@ -954,14 +997,14 @@ def run_changed(drain=False, limit=100, sleep_time=10, verbose=False): if QUEUE_ALL in items: # QUEUE_ALL is just an indicator to run make_daily_promotions. # There's no promotion log to update in this case. + print "Received QUEUE_ALL message" items.remove(QUEUE_ALL) make_daily_promotions() links = Link._by_fullname([i["link"] for i in items]) for item in items: - PromotionLog.add(links[c.link_id], + PromotionLog.add(links[item['link']], "Finished remaking current promotions (this link " - " was: %(message)s" % item, - commit=True) + "was: %(message)s" % item) amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain, sleep_time=sleep_time, verbose=verbose)