LiveAdWeights: Cache friendly, non-permacache ads

This commit is contained in:
Keith Mitchell
2012-11-01 15:52:36 -07:00
parent 1ef424958e
commit 940b54fc84
3 changed files with 119 additions and 203 deletions

View File

@@ -27,8 +27,8 @@ import json
import time
from r2.models import *
from r2.models.bidding import SponsorBoxWeightings, WeightingRef
from r2.models.keyvalue import NamedGlobals
from r2.models.promo import LiveAdWeights
from r2.lib.wrapped import Wrapped
from r2.lib import authorize
from r2.lib import emailer
@@ -475,7 +475,7 @@ def reject_promotion(link, reason = None):
# while we're doing work here, it will correctly exclude it
set_promote_status(link, PROMOTE_STATUS.rejected)
links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
links = get_live_promotions([LiveAdWeights.ALL_ADS])[0]
if link._fullname in links:
PromotionLog.add(link, 'Marked promotion for rejection')
queue_changed_promo(link, "rejected")
@@ -669,20 +669,9 @@ def get_live_promotions(srids, from_permacache=True):
timer = g.stats.get_timer("promote.get_live.cass")
timer.start()
links = set()
weights = {}
find_srids = set(srids)
if '' in find_srids:
find_srids.remove('')
find_srids.add(SponsorBoxWeightings.FRONT_PAGE)
ads = SponsorBoxWeightings.load_multi(find_srids)
for srid, refs in ads.iteritems():
links.update(ref.data['link'] for ref in refs)
promos = [ref.to_promo() for ref in refs]
if srid == SponsorBoxWeightings.FRONT_PAGE:
srid = ''
elif srid == SponsorBoxWeightings.ALL_ADS_ID:
srid = 'all'
weights[srid] = promos
weights = LiveAdWeights.get(srids)
for promos in weights.itervalues():
links.update(link_fn for link_fn, weight, campaign_fn in promos)
timer.stop()
else:
timer = g.stats.get_timer("promote.get_live.permacache")
@@ -701,7 +690,23 @@ def set_live_promotions(links, weights, which=("cass", "permacache")):
if "cass" in which:
timer = g.stats.get_timer("promote.set_live.cass")
timer.start()
SponsorBoxWeightings.set_from_weights(weights)
# First, figure out which subreddits have had ads recently
today = promo_datetime_now()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
promo_weights = PromotionWeights.get_campaigns(yesterday, tomorrow)
subreddit_names = set(p.sr_name for p in promo_weights)
subreddits = Subreddit._by_name(subreddit_names).values()
# Set the default for those subreddits to no ads
all_weights = {sr._id: [] for sr in subreddits}
# Mix in the currently live ads
all_weights.update(weights)
if '' in all_weights:
all_weights[LiveAdWeights.FRONT_PAGE] = all_weights.pop('')
LiveAdWeights.set_all_from_weights(all_weights)
timer.stop()
# Gotcha: even if links are scheduled and authorized, they won't be added to
@@ -729,7 +734,7 @@ def make_daily_promotions(offset = 0, test = False):
if not test:
l._commit()
old_links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
old_links = get_live_promotions([LiveAdWeights.ALL_ADS])[0]
# links that need to be promoted
new_links = all_links - old_links
@@ -794,7 +799,7 @@ def get_promotion_list(user, site):
def get_promotions_cached(sites):
p = get_live_promotions(sites)
p = get_live_promotions(sites, from_permacache=True)
if p:
links, promo_dict = p
available = {}

View File

@@ -466,188 +466,10 @@ class PromotionWeights(Sessionized, Base):
return res
class WeightingRef(object):
def __init__(self, weight, data):
self.data = data
self.data['weight'] = weight
self.weight = weight
@classmethod
def from_promo(cls, link_fn, weight, campaign_fn):
data = {"link": link_fn, "campaign": campaign_fn, "weight": weight}
return cls(weight, data)
@classmethod
def from_cass(cls, string):
data = json.loads(string)
return cls(data['weight'], data)
def to_promo(self):
return (self.data['link'], self.data['weight'], self.data['campaign'])
def to_cass(self):
return json.dumps(self.data)
def __repr__(self):
return "%s(%r, %r)" % (self.__class__.__name__, self.weight,
self.data)
def __str__(self):
return "WeightingRef: %s" % self.to_cass()
class SponsorBoxWeightings(object):
__metaclass__ = tdb_cassandra.ThingMeta
_use_db = True
_read_consistency_level = tdb_cassandra.CL.ONE
# To avoid spoiling caches when pulling back? Only the cron job
# and reject_promo will be writing.
_write_consistency_level = tdb_cassandra.CL.QUORUM
_connection_pool = 'main'
_compare_with = pycassa.INT_TYPE
_str_props = ()
_type_prefix = None
_cf_name = None
_SCALE = 2**32
# TTL is set to 1 day so that we can examine
# the data if something goes wrong.
_ttl = datetime.timedelta(days=1)
FRONT_PAGE = 0
# TODO The concept of an "All ads row" should not be needed
# after the permacache implementation is removed.
ALL_ADS_ID = -1
_IDX_ROWKEY_FMT = '%s/index'
_IDX_COLUMN_KEY = 0
class ID(int):
@property
def _id(self):
return int(self)
def __init__(self, subreddit, timestamp, items, is_srid=True,
mutator=None):
self.subreddit = self.ID(subreddit) if is_srid else subreddit
self.items = items
self.timeslot = tdb_cassandra.date_serializer.pack(timestamp)
# If a Mutator is not passed in, the ColumnFamily has a compatible
# insert() method
self.mutator = mutator or self._cf
@classmethod
def index_rowkey(cls, subreddit):
return cls._IDX_ROWKEY_FMT % subreddit._id
def rowkey(self):
return '%s/%s' % (self.subreddit._id, self.timeslot)
@classmethod
def get_latest_rowkey(cls, subreddit):
idx_rowkey = cls.index_rowkey(subreddit)
try:
row = cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY])
except tdb_cassandra.NotFoundException:
return None
else:
return row[cls._IDX_COLUMN_KEY]
@classmethod
def load_by_sr(cls, sr_id):
rowkey = cls.get_latest_rowkey(cls.ID(sr_id))
if rowkey:
data = cls._load(rowkey)
else:
data = []
return data
@classmethod
def _load(cls, rowkey):
return [WeightingRef.from_cass(val)
for dummy, val in cls._cf.xget(rowkey)]
@classmethod
def load_multi(cls, sr_ids):
# TODO: Use multiget & fully read rows
return {sr_id: cls.load_by_sr(sr_id) for sr_id in sr_ids}
def set_as_latest(self):
self.set_timeslots()
self._set_as_latest()
@tdb_cassandra.will_write
def _set_as_latest(self):
rowkey = self.rowkey() if not self.empty else ''
self.mutator.insert(self.index_rowkey(self.subreddit),
{self._IDX_COLUMN_KEY: rowkey},
ttl=self._ttl)
@tdb_cassandra.will_write
def set_timeslots(self):
campaign_weights = self._calculate_weights()
if campaign_weights:
self.mutator.insert(self.rowkey(), campaign_weights, ttl=self._ttl)
def _calculate_weights(self):
# Distribute the ads as "slices" of the range 0 - 2^32
# Each Ad gets the slice from the prior Ad's column key to its own
# column key.
weight_tally = 0
total_weight = float(sum(item.weight for item in self.items))
campaign_weights = {}
for item in self.items:
scaled_weight = int(item.weight / total_weight * self._SCALE)
weight_tally += scaled_weight
campaign_weights[weight_tally] = item.to_cass()
if weight_tally > self._SCALE:
raise ValueError("Awkward: Your math was a bit off.")
return campaign_weights
@property
def empty(self):
return not bool(self.items)
@classmethod
def set_from_weights(cls, all_weights):
weights = all_weights.copy()
all_ads = itertools.chain.from_iterable(all_weights.itervalues())
weights[cls.ALL_ADS_ID] = all_ads
if '' in weights:
weights[cls.FRONT_PAGE] = weights.pop('')
today = datetime.datetime.now(g.tz)
while weights:
srid, promos = weights.popitem()
weight_refs = [WeightingRef.from_promo(*promo) for promo in promos]
sbw = cls(srid, today, weight_refs, is_srid=True)
sbw.set_as_latest()
# Clear out expired ads
# First, figure out which subreddits may have had ads
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
promo_weights = PromotionWeights.get_campaigns(yesterday, tomorrow)
subreddit_names = set(p.sr_name for p in promo_weights)
subreddits = Subreddit._by_name(subreddit_names).values()
# Next, clear out any ads from those subreddits, if and only if
# that subreddit wasn't updated already during this function call
mutator = cls._cf.batch()
for sr in subreddits:
if sr._id not in all_weights:
cls.clear(sr, timeslot=today, mutator=mutator)
mutator.send()
@classmethod
def clear(cls, srid, timeslot=None, is_srid=False, mutator=None):
timeslot = timeslot or datetime.datetime.now(g.tz)
sbw = cls(srid, timeslot, [], is_srid=is_srid, mutator=mutator)
sbw.set_as_latest()
def to_date(d):
if isinstance(d, datetime.datetime):
return d.date()
return d
# do all the leg work of creating/connecting to tables
if g.db_create_tables:

View File

@@ -20,16 +20,19 @@
# Inc. All Rights Reserved.
###############################################################################
from datetime import datetime
from datetime import datetime, timedelta
from uuid import uuid1
import json
from pylons import g, c
from r2.lib import filters
from r2.lib.cache import sgm
from r2.lib.db import tdb_cassandra
from r2.lib.db.thing import Thing, NotFound
from r2.lib.memoize import memoize
from r2.lib.utils import Enum
from r2.models.subreddit import Subreddit
PROMOTE_STATUS = Enum("unpaid", "unseen", "accepted", "rejected",
@@ -38,7 +41,6 @@ PROMOTE_STATUS = Enum("unpaid", "unseen", "accepted", "rejected",
@memoize("get_promote_srid")
def get_promote_srid(name = 'promos'):
from r2.models.subreddit import Subreddit
try:
sr = Subreddit._by_name(name, stale=True)
except NotFound:
@@ -134,3 +136,90 @@ class PromotionLog(tdb_cassandra.View):
return [t[1] for t in tuples]
class LiveAdWeights(object):
"""Data store for per-subreddit lists of currently running ads"""
__metaclass__ = tdb_cassandra.ThingMeta
_use_db = True
_connection_pool = 'main'
_type_prefix = None
_cf_name = None
_compare_with = tdb_cassandra.ASCII_TYPE
# TTL is 12 hours, to avoid unexpected ads running indefinitely
# See note in set_all_from_weights() for more information
_ttl = timedelta(hours=12)
column = 'adweights'
cache = g.cache
cache_prefix = 'live-adweights-'
ALL_ADS = 'all'
FRONT_PAGE = 'frontpage'
def __init__(self):
raise NotImplementedError()
@classmethod
def to_columns(cls, weights):
"""Generate a serializable dict representation weights"""
return {cls.column: json.dumps(weights)}
@classmethod
def from_columns(cls, columns):
"""Given a (serializable) dict, restore the weights"""
weights = json.loads(columns.get(cls.column, '[]')) if columns else []
# JSON doesn't have the concept of tuples; this restores the return
# value to being a list of tuples.
return [tuple(w) for w in weights]
@classmethod
def _load_multi(cls, sr_ids):
skeys = {sr_id: str(sr_id) for sr_id in sr_ids}
adweights = cls._cf.multiget(skeys.values(), columns=[cls.column])
res = {}
for sr_id in sr_ids:
# The returned dictionary should include all sr_ids, so
# that ad-less SRs are inserted into the cache
res[skeys[sr_id]] = adweights.get(sr_id, {})
return res
@classmethod
def get(cls, sr_ids):
"""Return a dictionary of sr_id -> list of ads for each of sr_ids"""
# Mangling: Caller convention is to use empty string for FRONT_PAGE
sr_ids = [(sr_id or cls.FRONT_PAGE) for sr_id in sr_ids]
adweights = sgm(cls.cache, sr_ids, cls._load_multi,
prefix=cls.cache_prefix)
results = {sr_id: cls.from_columns(adweights[sr_id])
for sr_id in adweights}
if cls.FRONT_PAGE in results:
results[''] = results.pop(cls.FRONT_PAGE)
return results
@classmethod
def set_all_from_weights(cls, all_weights):
"""Given a dictionary with all ads that should currently be running
(where the dictionary keys are the subreddit IDs, and the paired
value is the list of ads for that subreddit), update the ad system
to use those ads on those subreddits.
Note: Old ads are not cleared out. It is expected that the caller
include empty-list entries in `all_weights` for any Subreddits
that should be cleared.
"""
weights = {}
all_ads = []
for sr_id, sr_ads in all_weights.iteritems():
all_ads.extend(sr_ads)
weights[str(sr_id)] = cls.to_columns(sr_ads)
weights[cls.ALL_ADS] = cls.to_columns(all_ads)
cls._cf.batch_insert(weights, ttl=cls._ttl)
# Prep the cache!
cls.cache.set_multi(weights, prefix=cls.cache_prefix)
@classmethod
def clear(cls, sr_id):
"""Clear ad information from the Subreddit with ID `sr_id`"""
cls.set_all_from_weights({sr_id: []})