Fix issues with use of amqp for SponsorBoxWeights

This commit is contained in:
Keith Mitchell
2012-10-24 14:16:12 -07:00
parent 3602982f34
commit 7e9f9fb488

View File

@@ -475,7 +475,7 @@ def reject_promotion(link, reason = None):
# while we're doing work here, it will correctly exclude it
set_promote_status(link, PROMOTE_STATUS.rejected)
links, = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
if link._fullname in links:
PromotionLog.add(link, 'Marked promotion for rejection')
queue_changed_promo(link, "rejected")
@@ -692,17 +692,17 @@ def get_live_promotions(srids, from_permacache=True):
return links, weights
def set_live_promotions(links, weights, which=("cass", "permacache")):
if "cass" in which:
timer = g.stats.get_timer("promote.set_live.cass")
timer.start()
SponsorBoxWeightings.set_from_weights(weights)
timer.stop()
def set_live_promotions(links, weights, which=("permacache",)):
if "permacache" in which:
timer = g.stats.get_timer("promote.set_live.permacache")
timer.start()
g.permacache.set(promotion_key(), (links, weights))
timer.stop()
if "cass" in which:
timer = g.stats.get_timer("promote.set_live.cass")
timer.start()
SponsorBoxWeightings.set_from_weights(weights)
timer.stop()
# Gotcha: even if links are scheduled and authorized, they won't be added to
# current promotions until they're actually charged, so make sure to call
@@ -930,18 +930,61 @@ def get_total_run(link):
return earliest, latest
def Run(offset = 0):
class PromotionLog(tdb_cassandra.View):
_use_db = True
_connection_pool = 'main'
_compare_with = TIME_UUID_TYPE
@classmethod
def _rowkey(cls, link):
return link._fullname
@classmethod
def add(cls, link, text):
name = c.user.name if c.user_is_loggedin else "<AUTOMATED>"
now = datetime.now(g.tz).strftime("%Y-%m-%d %H:%M:%S")
text = "[%s: %s] %s" % (name, now, text)
rowkey = cls._rowkey(link)
column = {uuid1(): filters._force_utf8(text)}
cls._set_values(rowkey, column)
# Dual write to old promotion_log attribute
log = list(getattr(link, "promotion_log", []))
log.append(text)
link.promotion_log = map(filters._force_utf8, log)
link._commit()
return text
@classmethod
def get(cls, link):
rowkey = cls._rowkey(link)
try:
row = cls._byID(rowkey)
except tdb_cassandra.NotFound:
return []
tuples = sorted(row._values().items(), key=lambda t: t[0].time)
return [t[1] for t in tuples]
def Run(offset=0, verbose=True):
"""reddit-job-update_promos: Intended to be run hourly to pull in
scheduled changes to ads
"""
if verbose:
print "promote.py:Run() - charge_pending()"
charge_pending(offset = offset + 1)
charge_pending(offset = offset)
if verbose:
print "promote.py:Run() - amqp.add_item()"
amqp.add_item(UPDATE_QUEUE, json.dumps(QUEUE_ALL),
delivery_mode=amqp.DELIVERY_TRANSIENT)
amqp.worker.join()
if verbose:
print "promote.py:Run() - finished"
def run_changed(drain=False, limit=100, sleep_time=10, verbose=False):
def run_changed(drain=False, limit=100, sleep_time=10, verbose=True):
"""reddit-consumer-update_promos: amqp consumer of update_promos_q
Handles asynch accepting/rejecting of ads that are scheduled to be live
@@ -954,14 +997,14 @@ def run_changed(drain=False, limit=100, sleep_time=10, verbose=False):
if QUEUE_ALL in items:
# QUEUE_ALL is just an indicator to run make_daily_promotions.
# There's no promotion log to update in this case.
print "Received QUEUE_ALL message"
items.remove(QUEUE_ALL)
make_daily_promotions()
links = Link._by_fullname([i["link"] for i in items])
for item in items:
PromotionLog.add(links[c.link_id],
PromotionLog.add(links[item['link']],
"Finished remaking current promotions (this link "
" was: %(message)s" % item,
commit=True)
"was: %(message)s" % item)
amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain,
sleep_time=sleep_time, verbose=verbose)