diff --git a/r2/r2/lib/promote.py b/r2/r2/lib/promote.py index 7f7121a51..b33600a6f 100644 --- a/r2/r2/lib/promote.py +++ b/r2/r2/lib/promote.py @@ -27,8 +27,8 @@ import json import time from r2.models import * -from r2.models.bidding import SponsorBoxWeightings, WeightingRef from r2.models.keyvalue import NamedGlobals +from r2.models.promo import LiveAdWeights from r2.lib.wrapped import Wrapped from r2.lib import authorize from r2.lib import emailer @@ -475,7 +475,7 @@ def reject_promotion(link, reason = None): # while we're doing work here, it will correctly exclude it set_promote_status(link, PROMOTE_STATUS.rejected) - links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0] + links = get_live_promotions([LiveAdWeights.ALL_ADS])[0] if link._fullname in links: PromotionLog.add(link, 'Marked promotion for rejection') queue_changed_promo(link, "rejected") @@ -669,20 +669,9 @@ def get_live_promotions(srids, from_permacache=True): timer = g.stats.get_timer("promote.get_live.cass") timer.start() links = set() - weights = {} - find_srids = set(srids) - if '' in find_srids: - find_srids.remove('') - find_srids.add(SponsorBoxWeightings.FRONT_PAGE) - ads = SponsorBoxWeightings.load_multi(find_srids) - for srid, refs in ads.iteritems(): - links.update(ref.data['link'] for ref in refs) - promos = [ref.to_promo() for ref in refs] - if srid == SponsorBoxWeightings.FRONT_PAGE: - srid = '' - elif srid == SponsorBoxWeightings.ALL_ADS_ID: - srid = 'all' - weights[srid] = promos + weights = LiveAdWeights.get(srids) + for promos in weights.itervalues(): + links.update(link_fn for link_fn, weight, campaign_fn in promos) timer.stop() else: timer = g.stats.get_timer("promote.get_live.permacache") @@ -701,7 +690,23 @@ def set_live_promotions(links, weights, which=("cass", "permacache")): if "cass" in which: timer = g.stats.get_timer("promote.set_live.cass") timer.start() - SponsorBoxWeightings.set_from_weights(weights) + + # First, figure out which subreddits have had ads recently + today = promo_datetime_now() + yesterday = today - timedelta(days=1) + tomorrow = today + timedelta(days=1) + promo_weights = PromotionWeights.get_campaigns(yesterday, tomorrow) + subreddit_names = set(p.sr_name for p in promo_weights) + subreddits = Subreddit._by_name(subreddit_names).values() + # Set the default for those subreddits to no ads + all_weights = {sr._id: [] for sr in subreddits} + + # Mix in the currently live ads + all_weights.update(weights) + if '' in all_weights: + all_weights[LiveAdWeights.FRONT_PAGE] = all_weights.pop('') + + LiveAdWeights.set_all_from_weights(all_weights) timer.stop() # Gotcha: even if links are scheduled and authorized, they won't be added to @@ -729,7 +734,7 @@ def make_daily_promotions(offset = 0, test = False): if not test: l._commit() - old_links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0] + old_links = get_live_promotions([LiveAdWeights.ALL_ADS])[0] # links that need to be promoted new_links = all_links - old_links @@ -794,7 +799,7 @@ def get_promotion_list(user, site): def get_promotions_cached(sites): - p = get_live_promotions(sites) + p = get_live_promotions(sites, from_permacache=True) if p: links, promo_dict = p available = {} diff --git a/r2/r2/models/bidding.py b/r2/r2/models/bidding.py index 0720eae26..104d3c30c 100644 --- a/r2/r2/models/bidding.py +++ b/r2/r2/models/bidding.py @@ -466,188 +466,10 @@ class PromotionWeights(Sessionized, Base): return res -class WeightingRef(object): - def __init__(self, weight, data): - self.data = data - self.data['weight'] = weight - self.weight = weight - - @classmethod - def from_promo(cls, link_fn, weight, campaign_fn): - data = {"link": link_fn, "campaign": campaign_fn, "weight": weight} - return cls(weight, data) - - @classmethod - def from_cass(cls, string): - data = json.loads(string) - return cls(data['weight'], data) - - def to_promo(self): - return (self.data['link'], self.data['weight'], self.data['campaign']) - - def to_cass(self): - return json.dumps(self.data) - - def __repr__(self): - return "%s(%r, %r)" % (self.__class__.__name__, self.weight, - self.data) - - def __str__(self): - return "WeightingRef: %s" % self.to_cass() - - -class SponsorBoxWeightings(object): - __metaclass__ = tdb_cassandra.ThingMeta - _use_db = True - _read_consistency_level = tdb_cassandra.CL.ONE - # To avoid spoiling caches when pulling back? Only the cron job - # and reject_promo will be writing. - _write_consistency_level = tdb_cassandra.CL.QUORUM - _connection_pool = 'main' - _compare_with = pycassa.INT_TYPE - _str_props = () - _type_prefix = None - _cf_name = None - _SCALE = 2**32 - - # TTL is set to 1 day so that we can examine - # the data if something goes wrong. - _ttl = datetime.timedelta(days=1) - - FRONT_PAGE = 0 - - # TODO The concept of an "All ads row" should not be needed - # after the permacache implementation is removed. - ALL_ADS_ID = -1 - - _IDX_ROWKEY_FMT = '%s/index' - _IDX_COLUMN_KEY = 0 - - class ID(int): - @property - def _id(self): - return int(self) - - def __init__(self, subreddit, timestamp, items, is_srid=True, - mutator=None): - self.subreddit = self.ID(subreddit) if is_srid else subreddit - self.items = items - self.timeslot = tdb_cassandra.date_serializer.pack(timestamp) - # If a Mutator is not passed in, the ColumnFamily has a compatible - # insert() method - self.mutator = mutator or self._cf - - @classmethod - def index_rowkey(cls, subreddit): - return cls._IDX_ROWKEY_FMT % subreddit._id - - def rowkey(self): - return '%s/%s' % (self.subreddit._id, self.timeslot) - - @classmethod - def get_latest_rowkey(cls, subreddit): - idx_rowkey = cls.index_rowkey(subreddit) - try: - row = cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY]) - except tdb_cassandra.NotFoundException: - return None - else: - return row[cls._IDX_COLUMN_KEY] - - @classmethod - def load_by_sr(cls, sr_id): - rowkey = cls.get_latest_rowkey(cls.ID(sr_id)) - if rowkey: - data = cls._load(rowkey) - else: - data = [] - return data - - @classmethod - def _load(cls, rowkey): - return [WeightingRef.from_cass(val) - for dummy, val in cls._cf.xget(rowkey)] - - @classmethod - def load_multi(cls, sr_ids): - # TODO: Use multiget & fully read rows - return {sr_id: cls.load_by_sr(sr_id) for sr_id in sr_ids} - - def set_as_latest(self): - self.set_timeslots() - self._set_as_latest() - - @tdb_cassandra.will_write - def _set_as_latest(self): - rowkey = self.rowkey() if not self.empty else '' - self.mutator.insert(self.index_rowkey(self.subreddit), - {self._IDX_COLUMN_KEY: rowkey}, - ttl=self._ttl) - - @tdb_cassandra.will_write - def set_timeslots(self): - campaign_weights = self._calculate_weights() - if campaign_weights: - self.mutator.insert(self.rowkey(), campaign_weights, ttl=self._ttl) - - def _calculate_weights(self): - # Distribute the ads as "slices" of the range 0 - 2^32 - # Each Ad gets the slice from the prior Ad's column key to its own - # column key. - weight_tally = 0 - total_weight = float(sum(item.weight for item in self.items)) - campaign_weights = {} - for item in self.items: - scaled_weight = int(item.weight / total_weight * self._SCALE) - weight_tally += scaled_weight - campaign_weights[weight_tally] = item.to_cass() - if weight_tally > self._SCALE: - raise ValueError("Awkward: Your math was a bit off.") - return campaign_weights - - @property - def empty(self): - return not bool(self.items) - - @classmethod - def set_from_weights(cls, all_weights): - weights = all_weights.copy() - all_ads = itertools.chain.from_iterable(all_weights.itervalues()) - weights[cls.ALL_ADS_ID] = all_ads - if '' in weights: - weights[cls.FRONT_PAGE] = weights.pop('') - - today = datetime.datetime.now(g.tz) - - while weights: - srid, promos = weights.popitem() - weight_refs = [WeightingRef.from_promo(*promo) for promo in promos] - sbw = cls(srid, today, weight_refs, is_srid=True) - sbw.set_as_latest() - - # Clear out expired ads - - # First, figure out which subreddits may have had ads - yesterday = today - datetime.timedelta(days=1) - tomorrow = today + datetime.timedelta(days=1) - promo_weights = PromotionWeights.get_campaigns(yesterday, tomorrow) - subreddit_names = set(p.sr_name for p in promo_weights) - subreddits = Subreddit._by_name(subreddit_names).values() - - # Next, clear out any ads from those subreddits, if and only if - # that subreddit wasn't updated already during this function call - mutator = cls._cf.batch() - for sr in subreddits: - if sr._id not in all_weights: - cls.clear(sr, timeslot=today, mutator=mutator) - mutator.send() - - @classmethod - def clear(cls, srid, timeslot=None, is_srid=False, mutator=None): - timeslot = timeslot or datetime.datetime.now(g.tz) - sbw = cls(srid, timeslot, [], is_srid=is_srid, mutator=mutator) - sbw.set_as_latest() - +def to_date(d): + if isinstance(d, datetime.datetime): + return d.date() + return d # do all the leg work of creating/connecting to tables if g.db_create_tables: diff --git a/r2/r2/models/promo.py b/r2/r2/models/promo.py index ccf04f4bd..d247911bf 100644 --- a/r2/r2/models/promo.py +++ b/r2/r2/models/promo.py @@ -20,16 +20,19 @@ # Inc. All Rights Reserved. ############################################################################### -from datetime import datetime +from datetime import datetime, timedelta from uuid import uuid1 +import json from pylons import g, c from r2.lib import filters +from r2.lib.cache import sgm from r2.lib.db import tdb_cassandra from r2.lib.db.thing import Thing, NotFound from r2.lib.memoize import memoize from r2.lib.utils import Enum +from r2.models.subreddit import Subreddit PROMOTE_STATUS = Enum("unpaid", "unseen", "accepted", "rejected", @@ -38,7 +41,6 @@ PROMOTE_STATUS = Enum("unpaid", "unseen", "accepted", "rejected", @memoize("get_promote_srid") def get_promote_srid(name = 'promos'): - from r2.models.subreddit import Subreddit try: sr = Subreddit._by_name(name, stale=True) except NotFound: @@ -134,3 +136,90 @@ class PromotionLog(tdb_cassandra.View): return [t[1] for t in tuples] +class LiveAdWeights(object): + """Data store for per-subreddit lists of currently running ads""" + __metaclass__ = tdb_cassandra.ThingMeta + _use_db = True + _connection_pool = 'main' + _type_prefix = None + _cf_name = None + _compare_with = tdb_cassandra.ASCII_TYPE + # TTL is 12 hours, to avoid unexpected ads running indefinitely + # See note in set_all_from_weights() for more information + _ttl = timedelta(hours=12) + + column = 'adweights' + cache = g.cache + cache_prefix = 'live-adweights-' + + ALL_ADS = 'all' + FRONT_PAGE = 'frontpage' + + def __init__(self): + raise NotImplementedError() + + @classmethod + def to_columns(cls, weights): + """Generate a serializable dict representation weights""" + return {cls.column: json.dumps(weights)} + + @classmethod + def from_columns(cls, columns): + """Given a (serializable) dict, restore the weights""" + weights = json.loads(columns.get(cls.column, '[]')) if columns else [] + # JSON doesn't have the concept of tuples; this restores the return + # value to being a list of tuples. + return [tuple(w) for w in weights] + + @classmethod + def _load_multi(cls, sr_ids): + skeys = {sr_id: str(sr_id) for sr_id in sr_ids} + adweights = cls._cf.multiget(skeys.values(), columns=[cls.column]) + res = {} + for sr_id in sr_ids: + # The returned dictionary should include all sr_ids, so + # that ad-less SRs are inserted into the cache + res[skeys[sr_id]] = adweights.get(sr_id, {}) + return res + + @classmethod + def get(cls, sr_ids): + """Return a dictionary of sr_id -> list of ads for each of sr_ids""" + # Mangling: Caller convention is to use empty string for FRONT_PAGE + sr_ids = [(sr_id or cls.FRONT_PAGE) for sr_id in sr_ids] + adweights = sgm(cls.cache, sr_ids, cls._load_multi, + prefix=cls.cache_prefix) + results = {sr_id: cls.from_columns(adweights[sr_id]) + for sr_id in adweights} + if cls.FRONT_PAGE in results: + results[''] = results.pop(cls.FRONT_PAGE) + return results + + @classmethod + def set_all_from_weights(cls, all_weights): + """Given a dictionary with all ads that should currently be running + (where the dictionary keys are the subreddit IDs, and the paired + value is the list of ads for that subreddit), update the ad system + to use those ads on those subreddits. + + Note: Old ads are not cleared out. It is expected that the caller + include empty-list entries in `all_weights` for any Subreddits + that should be cleared. + + """ + weights = {} + all_ads = [] + for sr_id, sr_ads in all_weights.iteritems(): + all_ads.extend(sr_ads) + weights[str(sr_id)] = cls.to_columns(sr_ads) + weights[cls.ALL_ADS] = cls.to_columns(all_ads) + + cls._cf.batch_insert(weights, ttl=cls._ttl) + + # Prep the cache! + cls.cache.set_multi(weights, prefix=cls.cache_prefix) + + @classmethod + def clear(cls, sr_id): + """Clear ad information from the Subreddit with ID `sr_id`""" + cls.set_all_from_weights({sr_id: []})