Store live promotions as structured C* data

Initially this is dual-write only
This commit is contained in:
Keith Mitchell
2012-09-12 15:32:34 -07:00
parent 6828c6940f
commit e34e8d8d3e
2 changed files with 277 additions and 28 deletions

View File

@@ -23,6 +23,7 @@
from __future__ import with_statement
from r2.models import *
from r2.models.bidding import SponsorBoxWeightings, WeightingRef
from r2.lib.wrapped import Wrapped
from r2.lib import authorize
from r2.lib import emailer, filters
@@ -558,9 +559,14 @@ def accept_promotion(link):
def reject_promotion(link, reason = None):
PromotionLog.add(link, 'status update: rejected')
# update the query queue
# Since status is updated first,
# if make_daily_promotions happens to run
# while we're doing work here, it will correctly exclude it
set_status(link, STATUS.rejected)
# check to see if this link is a member of the current live list
links, weighted = get_live_promotions()
# Updates just the permacache list
# permacache doesn't check the srids list; send an empty list
links, weighted = get_live_promotions([], _use_cass=False)
if link._fullname in links:
links.remove(link._fullname)
for k in list(weighted.keys()):
@@ -568,18 +574,25 @@ def reject_promotion(link, reason = None):
if lid != link._fullname]
if not weighted[k]:
del weighted[k]
set_live_promotions((links, weighted))
set_live_promotions(links, weighted, which=("permacache",))
PromotionLog.add(link, 'dequeued')
# don't send a rejection email when the rejection was user initiated.
# Updates just the Cassandra version
campaigns = PromoCampaign._by_link(link._id)
subreddits = Subreddit._by_name([c.sr_name for c in campaigns],
return_dict=False)
SponsorBoxWeightings.remove_link(link._fullname, subreddits)
# Send a rejection email (unless the advertiser requested the reject)
if not c.user or c.user._id != link.author_id:
emailer.reject_promo(link, reason = reason)
def unapprove_promotion(link):
PromotionLog.add(link, 'status update: unapproved')
# update the query queue
set_status(link, STATUS.unseen)
links, weghts = get_live_promotions()
def accepted_campaigns(offset=0):
now = promo_datetime_now(offset=offset)
@@ -722,11 +735,46 @@ def weight_schedule(by_sr):
def promotion_key():
return "current_promotions:1"
def get_live_promotions():
return g.permacache.get(promotion_key()) or (set(), {})
def get_live_promotions(srids, _use_cass=False):
if _use_cass:
timer = g.stats.get_timer("promote.get_live.cass")
timer.start()
links = set()
weights = {}
find_srids = set(srids)
if '' in find_srids:
find_srids.remove('')
find_srids.add(SponsorBoxWeightings.DEFAULT_SR_ID)
ads = SponsorBoxWeightings.load_multi(find_srids)
for srid, refs in ads.iteritems():
links.update([ref.data['link'] for ref in refs])
promos = [ref.to_promo() for ref in refs]
if srid == SponsorBoxWeightings.DEFAULT_SR_ID:
srid = ''
elif srid == SponsorBoxWeightings.ALL_ADS_ID:
srid = 'all'
weights[srid] = promos
links.update([ad.data['link'] for ad in ads])
timer.stop()
else:
timer = g.stats.get_timer("promote.get_live.permacache")
timer.start()
links, weights = g.permacache.get(promotion_key()) or (set(), {})
timer.stop()
return links, weights
def set_live_promotions(x):
return g.permacache.set(promotion_key(), x)
def set_live_promotions(links, weights, which=("cass", "permacache")):
if "cass" in which:
timer = g.stats.get_timer("promote.set_live.cass")
timer.start()
SponsorBoxWeightings.set_from_weights(weights)
timer.stop()
if "permacache" in which:
timer = g.stats.get_timer("promote.set_live.permacache")
timer.start()
g.permacache.set(promotion_key(), (links, weights))
timer.stop()
# Gotcha: even if links are scheduled and authorized, they won't be added to
# current promotions until they're actually charged, so make sure to call
@@ -738,8 +786,6 @@ def make_daily_promotions(offset = 0, test = False):
test - if True, new schedule will be generated but not launched
Raises Exception with list of campaigns that had errors if there were any
'''
old_links = set([])
schedule = get_scheduled(offset)
all_links = set([l._fullname for l in schedule['links']])
error_campaigns = schedule['error_campaigns']
@@ -755,18 +801,16 @@ def make_daily_promotions(offset = 0, test = False):
if not test:
l._commit()
x = get_live_promotions()
if x:
old_links, old_weights = x
# links that need to be promoted
new_links = all_links - old_links
# links that have already been promoted
old_links = old_links - all_links
else:
new_links = links
old_links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
# links that need to be promoted
new_links = all_links - old_links
# links that have already been promoted
old_links = old_links - all_links
links = Link._by_fullname(new_links.union(old_links), data = True,
return_dict = True)
for l in old_links:
if is_promoted(links[l]):
if test:
@@ -793,7 +837,7 @@ def make_daily_promotions(offset = 0, test = False):
weighted = dict((srs[k], v) for k, v in weighted.iteritems())
if not test:
set_live_promotions((all_links, weighted))
set_live_promotions(all_links, weighted)
else:
print (all_links, weighted)
@@ -822,7 +866,7 @@ def get_promotion_list(user, site):
#@memoize('get_promotions_cached', time = 10 * 60)
def get_promotions_cached(sites):
p = get_live_promotions()
p = get_live_promotions(sites)
if p:
links, promo_dict = p
available = {}
@@ -838,7 +882,7 @@ def get_promotions_cached(sites):
norm = sum(available.values())
# return a sorted list of (link, norm_weight)
return [(l, available[l] / norm, campaigns[l]) for l in links]
return []
def randomized_promotion_list(user, site):

View File

@@ -20,6 +20,13 @@
# Inc. All Rights Reserved.
###############################################################################
import datetime
import itertools
import json
import random
import pycassa
from pylons import g, request
from sqlalchemy import Column, String, DateTime, Date, Float, Integer, Boolean,\
BigInteger, func as safunc, and_, or_
from sqlalchemy.exc import IntegrityError
@@ -28,14 +35,14 @@ from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.dialects.postgresql.base import PGInet as Inet
from sqlalchemy.ext.declarative import declarative_base
from pylons import g
from r2.lib.utils import Enum
from r2.lib.utils import Enum, fetch_things2
from r2.models.account import Account
from r2.models import Link
from r2.models import Link, Subreddit
import r2.lib.db.operators as db_ops
from r2.lib.db.thing import Thing, NotFound
from pylons import request
import r2.lib.db.tdb_cassandra as tdb_cassandra
from r2.lib.memoize import memoize
import datetime
engine = g.dbm.get_engine('authorize')
@@ -585,6 +592,204 @@ class PromotionWeights(Sessionized, Base):
d += datetime.timedelta(1)
return res
class WeightingRef(object):
def __init__(self, weight, data):
self.data = data
self.data['weight'] = weight
self.weight = weight
@classmethod
def from_promo(cls, link_fn, weight, campaign_fn):
data = {"link": link_fn, "campaign": campaign_fn, "weight": weight}
return cls(weight, data)
@classmethod
def from_cass(cls, string):
data = json.loads(string)
return cls(data['weight'], data)
def to_promo(self):
return (self.data['link'], self.data['weight'], self.data['campaign'])
def to_cass(self):
return json.dumps(self.data)
def __repr__(self):
return "%s(%r, %r)" % (self.__class__.__name__, self.weight,
self.data)
def __str__(self):
return "WeightingRef: %s" % self.to_cass()
class SponsorBoxWeightings(object):
__metaclass__ = tdb_cassandra.ThingMeta
_use_db = True
_read_consistency_level = tdb_cassandra.CL.ONE
# To avoid spoiling caches when pulling back? Only the cron job
# and reject_promo will be writing.
_write_consistency_level = tdb_cassandra.CL.QUORUM
_connection_pool = 'main'
_compare_with = pycassa.INT_TYPE
_str_props = ()
_type_prefix = None
_cf_name = None
_SCALE = 2**32
# TTL for the timestamped rows. Set to 1 day so that we can examine
# the data if something goes wrong.
TTL = datetime.timedelta(days=1).total_seconds()
DEFAULT_SR_ID = 0
# TODO The concept of an "All ads row" should not be needed
# after the permacache implementation is removed.
ALL_ADS_ID = -1
_IDX_ROWKEY_FMT = '%s.latest'
_IDX_COLUMN_KEY = 0
class ID(int):
@property
def _id(self):
return int(self)
def __init__(self, subreddit, timestamp, items, is_srid=True):
self.subreddit = self.ID(subreddit) if is_srid else subreddit
self.items = items
self.timeslot = tdb_cassandra.date_serializer.pack(timestamp)
@classmethod
def index_column(cls, subreddit):
return subreddit._id
@classmethod
def index_rowkey(cls, subreddit):
return cls._IDX_ROWKEY_FMT % subreddit._id
def rowkey(self):
return '%s.%s' % (self.subreddit._id, self.timeslot)
@classmethod
def get_latest_rowkey(cls, subreddit):
# This is a 2 layered function so the memoize key can be an ID instead
# of a Subreddit object
return cls._get_latest_rowkey(cls.index_rowkey(subreddit))
@classmethod
@memoize('sponsor_box_weightings_rowkey', time=60 * 60, stale=False)
def _get_latest_rowkey(cls, idx_rowkey, _update=False):
if _update:
# Don't spoil the memoize cache with outdated data
rcl = tdb_cassandra.CL.QUORUM
else:
rcl = cls._read_consistency_level
try:
return cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY],
read_consistency_level=rcl)[cls._IDX_COLUMN_KEY]
except tdb_cassandra.NotFoundException:
return None
@classmethod
def load_by_sr(cls, sr_id):
rowkey = cls.get_latest_rowkey(cls.ID(sr_id))
if rowkey:
data = cls._load(rowkey)
else:
data = []
return data
@classmethod
@memoize('sponsor_box_weightings__load', time=60 * 60, stale=True)
def _load(cls, rowkey, _update=False):
return [WeightingRef.from_cass(val)
for dummy, val in cls._cf.xget(rowkey)]
@classmethod
def load_multi(cls, sr_ids):
# TODO: Use multiget & fully read rows
return {sr_id: cls.load_by_sr(sr_id) for sr_id in sr_ids}
def set_as_latest(self):
self.set_timeslots()
self._set_as_latest()
@tdb_cassandra.will_write
def _set_as_latest(self):
rowkey = self.rowkey() if not self.empty else ''
self._cf.insert(self.index_rowkey(self.subreddit),
{self._IDX_COLUMN_KEY: rowkey},
ttl=self.TTL)
self._get_latest_rowkey(self.index_rowkey(self.subreddit),
_update=True)
@tdb_cassandra.will_write
def set_timeslots(self):
campaign_weights = self._calculate_weights()
if campaign_weights:
self._cf.insert(self.rowkey(), campaign_weights, ttl=self.TTL)
def _calculate_weights(self):
# Distribute the ads as "slices" of the range 0 - 2^32
# Each Ad gets the slice from the prior Ad's column key to its own
# column key.
weight_tally = 0
total_weight = float(sum(item.weight for item in self.items))
campaign_weights = {}
for item in self.items:
scaled_weight = int(item.weight / total_weight * self._SCALE)
weight_tally += scaled_weight
campaign_weights[weight_tally] = item.to_cass()
if weight_tally > self._SCALE:
raise ValueError("Awkward: Your math was a bit off.")
return campaign_weights
@property
def empty(self):
return not bool(self.items)
@classmethod
def set_from_weights(cls, all_weights):
weights = all_weights.copy()
all_ads = itertools.chain.from_iterable(all_weights.itervalues())
weights[cls.ALL_ADS_ID] = all_ads
if '' in weights:
weights[cls.DEFAULT_SR_ID] = weights.pop('')
timeslot = datetime.datetime.now(g.tz)
while weights:
srid, promos = weights.popitem()
weight_refs = [WeightingRef.from_promo(*promo) for promo in promos]
sbw = cls(srid, timeslot, weight_refs, is_srid=True)
sbw.set_as_latest()
# Clear out expired ads
query = Subreddit._query(sort=db_ops.desc('_date'), data=False)
for subreddit in fetch_things2(query):
if subreddit._id not in all_weights:
cls.clear(subreddit, timeslot=timeslot)
@classmethod
def clear(cls, srid, timeslot=None, is_srid=False):
timeslot = timeslot or datetime.datetime.now(g.tz)
cls(srid, timeslot, [], is_srid=is_srid).set_as_latest()
@classmethod
def remove_link(cls, link_fn, from_subreddits, include_all_sr=True):
now = datetime.datetime.now(g.tz)
if include_all_sr:
srs = itertools.chain(from_subreddits, [cls.ID(cls.ALL_ADS_ID)])
else:
srs = iter(from_subreddits)
for subreddit in srs:
current = cls.load_by_sr(subreddit._id)
updated = [r for r in current if r.data['link'] != link_fn]
cls(subreddit, now, updated).set_as_latest()
def to_date(d):
if isinstance(d, datetime.datetime):
return d.date()