From e34e8d8d3e55581f4cf9b4994c1222004d006e22 Mon Sep 17 00:00:00 2001 From: Keith Mitchell Date: Wed, 12 Sep 2012 15:32:34 -0700 Subject: [PATCH] Store live promotions as structured C* data Initially this is dual-write only --- r2/r2/lib/promote.py | 90 ++++++++++++----- r2/r2/models/bidding.py | 215 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 277 insertions(+), 28 deletions(-) diff --git a/r2/r2/lib/promote.py b/r2/r2/lib/promote.py index 4a80004af..f6744ffaf 100644 --- a/r2/r2/lib/promote.py +++ b/r2/r2/lib/promote.py @@ -23,6 +23,7 @@ from __future__ import with_statement from r2.models import * +from r2.models.bidding import SponsorBoxWeightings, WeightingRef from r2.lib.wrapped import Wrapped from r2.lib import authorize from r2.lib import emailer, filters @@ -558,9 +559,14 @@ def accept_promotion(link): def reject_promotion(link, reason = None): PromotionLog.add(link, 'status update: rejected') # update the query queue + # Since status is updated first, + # if make_daily_promotions happens to run + # while we're doing work here, it will correctly exclude it set_status(link, STATUS.rejected) - # check to see if this link is a member of the current live list - links, weighted = get_live_promotions() + + # Updates just the permacache list + # permacache doesn't check the srids list; send an empty list + links, weighted = get_live_promotions([], _use_cass=False) if link._fullname in links: links.remove(link._fullname) for k in list(weighted.keys()): @@ -568,18 +574,25 @@ def reject_promotion(link, reason = None): if lid != link._fullname] if not weighted[k]: del weighted[k] - set_live_promotions((links, weighted)) + set_live_promotions(links, weighted, which=("permacache",)) PromotionLog.add(link, 'dequeued') - # don't send a rejection email when the rejection was user initiated. + + # Updates just the Cassandra version + campaigns = PromoCampaign._by_link(link._id) + subreddits = Subreddit._by_name([c.sr_name for c in campaigns], + return_dict=False) + SponsorBoxWeightings.remove_link(link._fullname, subreddits) + + # Send a rejection email (unless the advertiser requested the reject) if not c.user or c.user._id != link.author_id: emailer.reject_promo(link, reason = reason) + def unapprove_promotion(link): PromotionLog.add(link, 'status update: unapproved') # update the query queue set_status(link, STATUS.unseen) - links, weghts = get_live_promotions() def accepted_campaigns(offset=0): now = promo_datetime_now(offset=offset) @@ -722,11 +735,46 @@ def weight_schedule(by_sr): def promotion_key(): return "current_promotions:1" -def get_live_promotions(): - return g.permacache.get(promotion_key()) or (set(), {}) +def get_live_promotions(srids, _use_cass=False): + if _use_cass: + timer = g.stats.get_timer("promote.get_live.cass") + timer.start() + links = set() + weights = {} + find_srids = set(srids) + if '' in find_srids: + find_srids.remove('') + find_srids.add(SponsorBoxWeightings.DEFAULT_SR_ID) + ads = SponsorBoxWeightings.load_multi(find_srids) + for srid, refs in ads.iteritems(): + links.update([ref.data['link'] for ref in refs]) + promos = [ref.to_promo() for ref in refs] + if srid == SponsorBoxWeightings.DEFAULT_SR_ID: + srid = '' + elif srid == SponsorBoxWeightings.ALL_ADS_ID: + srid = 'all' + weights[srid] = promos + links.update([ad.data['link'] for ad in ads]) + timer.stop() + else: + timer = g.stats.get_timer("promote.get_live.permacache") + timer.start() + links, weights = g.permacache.get(promotion_key()) or (set(), {}) + timer.stop() + return links, weights -def set_live_promotions(x): - return g.permacache.set(promotion_key(), x) + +def set_live_promotions(links, weights, which=("cass", "permacache")): + if "cass" in which: + timer = g.stats.get_timer("promote.set_live.cass") + timer.start() + SponsorBoxWeightings.set_from_weights(weights) + timer.stop() + if "permacache" in which: + timer = g.stats.get_timer("promote.set_live.permacache") + timer.start() + g.permacache.set(promotion_key(), (links, weights)) + timer.stop() # Gotcha: even if links are scheduled and authorized, they won't be added to # current promotions until they're actually charged, so make sure to call @@ -738,8 +786,6 @@ def make_daily_promotions(offset = 0, test = False): test - if True, new schedule will be generated but not launched Raises Exception with list of campaigns that had errors if there were any ''' - old_links = set([]) - schedule = get_scheduled(offset) all_links = set([l._fullname for l in schedule['links']]) error_campaigns = schedule['error_campaigns'] @@ -755,18 +801,16 @@ def make_daily_promotions(offset = 0, test = False): if not test: l._commit() - x = get_live_promotions() - if x: - old_links, old_weights = x - # links that need to be promoted - new_links = all_links - old_links - # links that have already been promoted - old_links = old_links - all_links - else: - new_links = links + old_links = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0] + + # links that need to be promoted + new_links = all_links - old_links + # links that have already been promoted + old_links = old_links - all_links links = Link._by_fullname(new_links.union(old_links), data = True, return_dict = True) + for l in old_links: if is_promoted(links[l]): if test: @@ -793,7 +837,7 @@ def make_daily_promotions(offset = 0, test = False): weighted = dict((srs[k], v) for k, v in weighted.iteritems()) if not test: - set_live_promotions((all_links, weighted)) + set_live_promotions(all_links, weighted) else: print (all_links, weighted) @@ -822,7 +866,7 @@ def get_promotion_list(user, site): #@memoize('get_promotions_cached', time = 10 * 60) def get_promotions_cached(sites): - p = get_live_promotions() + p = get_live_promotions(sites) if p: links, promo_dict = p available = {} @@ -838,7 +882,7 @@ def get_promotions_cached(sites): norm = sum(available.values()) # return a sorted list of (link, norm_weight) return [(l, available[l] / norm, campaigns[l]) for l in links] - + return [] def randomized_promotion_list(user, site): diff --git a/r2/r2/models/bidding.py b/r2/r2/models/bidding.py index f1d5876d6..15df149c8 100644 --- a/r2/r2/models/bidding.py +++ b/r2/r2/models/bidding.py @@ -20,6 +20,13 @@ # Inc. All Rights Reserved. ############################################################################### +import datetime +import itertools +import json +import random + +import pycassa +from pylons import g, request from sqlalchemy import Column, String, DateTime, Date, Float, Integer, Boolean,\ BigInteger, func as safunc, and_, or_ from sqlalchemy.exc import IntegrityError @@ -28,14 +35,14 @@ from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.dialects.postgresql.base import PGInet as Inet from sqlalchemy.ext.declarative import declarative_base -from pylons import g -from r2.lib.utils import Enum + +from r2.lib.utils import Enum, fetch_things2 from r2.models.account import Account -from r2.models import Link +from r2.models import Link, Subreddit +import r2.lib.db.operators as db_ops from r2.lib.db.thing import Thing, NotFound -from pylons import request +import r2.lib.db.tdb_cassandra as tdb_cassandra from r2.lib.memoize import memoize -import datetime engine = g.dbm.get_engine('authorize') @@ -585,6 +592,204 @@ class PromotionWeights(Sessionized, Base): d += datetime.timedelta(1) return res + +class WeightingRef(object): + def __init__(self, weight, data): + self.data = data + self.data['weight'] = weight + self.weight = weight + + @classmethod + def from_promo(cls, link_fn, weight, campaign_fn): + data = {"link": link_fn, "campaign": campaign_fn, "weight": weight} + return cls(weight, data) + + @classmethod + def from_cass(cls, string): + data = json.loads(string) + return cls(data['weight'], data) + + def to_promo(self): + return (self.data['link'], self.data['weight'], self.data['campaign']) + + def to_cass(self): + return json.dumps(self.data) + + def __repr__(self): + return "%s(%r, %r)" % (self.__class__.__name__, self.weight, + self.data) + + def __str__(self): + return "WeightingRef: %s" % self.to_cass() + + +class SponsorBoxWeightings(object): + __metaclass__ = tdb_cassandra.ThingMeta + _use_db = True + _read_consistency_level = tdb_cassandra.CL.ONE + # To avoid spoiling caches when pulling back? Only the cron job + # and reject_promo will be writing. + _write_consistency_level = tdb_cassandra.CL.QUORUM + _connection_pool = 'main' + _compare_with = pycassa.INT_TYPE + _str_props = () + _type_prefix = None + _cf_name = None + _SCALE = 2**32 + + # TTL for the timestamped rows. Set to 1 day so that we can examine + # the data if something goes wrong. + TTL = datetime.timedelta(days=1).total_seconds() + + DEFAULT_SR_ID = 0 + + # TODO The concept of an "All ads row" should not be needed + # after the permacache implementation is removed. + ALL_ADS_ID = -1 + + _IDX_ROWKEY_FMT = '%s.latest' + _IDX_COLUMN_KEY = 0 + + class ID(int): + @property + def _id(self): + return int(self) + + def __init__(self, subreddit, timestamp, items, is_srid=True): + self.subreddit = self.ID(subreddit) if is_srid else subreddit + self.items = items + self.timeslot = tdb_cassandra.date_serializer.pack(timestamp) + + @classmethod + def index_column(cls, subreddit): + return subreddit._id + + @classmethod + def index_rowkey(cls, subreddit): + return cls._IDX_ROWKEY_FMT % subreddit._id + + def rowkey(self): + return '%s.%s' % (self.subreddit._id, self.timeslot) + + @classmethod + def get_latest_rowkey(cls, subreddit): + # This is a 2 layered function so the memoize key can be an ID instead + # of a Subreddit object + return cls._get_latest_rowkey(cls.index_rowkey(subreddit)) + + @classmethod + @memoize('sponsor_box_weightings_rowkey', time=60 * 60, stale=False) + def _get_latest_rowkey(cls, idx_rowkey, _update=False): + if _update: + # Don't spoil the memoize cache with outdated data + rcl = tdb_cassandra.CL.QUORUM + else: + rcl = cls._read_consistency_level + try: + return cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY], + read_consistency_level=rcl)[cls._IDX_COLUMN_KEY] + except tdb_cassandra.NotFoundException: + return None + + @classmethod + def load_by_sr(cls, sr_id): + rowkey = cls.get_latest_rowkey(cls.ID(sr_id)) + if rowkey: + data = cls._load(rowkey) + else: + data = [] + return data + + @classmethod + @memoize('sponsor_box_weightings__load', time=60 * 60, stale=True) + def _load(cls, rowkey, _update=False): + return [WeightingRef.from_cass(val) + for dummy, val in cls._cf.xget(rowkey)] + + @classmethod + def load_multi(cls, sr_ids): + # TODO: Use multiget & fully read rows + return {sr_id: cls.load_by_sr(sr_id) for sr_id in sr_ids} + + def set_as_latest(self): + self.set_timeslots() + self._set_as_latest() + + @tdb_cassandra.will_write + def _set_as_latest(self): + rowkey = self.rowkey() if not self.empty else '' + self._cf.insert(self.index_rowkey(self.subreddit), + {self._IDX_COLUMN_KEY: rowkey}, + ttl=self.TTL) + + self._get_latest_rowkey(self.index_rowkey(self.subreddit), + _update=True) + + @tdb_cassandra.will_write + def set_timeslots(self): + campaign_weights = self._calculate_weights() + if campaign_weights: + self._cf.insert(self.rowkey(), campaign_weights, ttl=self.TTL) + + def _calculate_weights(self): + # Distribute the ads as "slices" of the range 0 - 2^32 + # Each Ad gets the slice from the prior Ad's column key to its own + # column key. + weight_tally = 0 + total_weight = float(sum(item.weight for item in self.items)) + campaign_weights = {} + for item in self.items: + scaled_weight = int(item.weight / total_weight * self._SCALE) + weight_tally += scaled_weight + campaign_weights[weight_tally] = item.to_cass() + if weight_tally > self._SCALE: + raise ValueError("Awkward: Your math was a bit off.") + return campaign_weights + + @property + def empty(self): + return not bool(self.items) + + @classmethod + def set_from_weights(cls, all_weights): + weights = all_weights.copy() + all_ads = itertools.chain.from_iterable(all_weights.itervalues()) + weights[cls.ALL_ADS_ID] = all_ads + if '' in weights: + weights[cls.DEFAULT_SR_ID] = weights.pop('') + + timeslot = datetime.datetime.now(g.tz) + + while weights: + srid, promos = weights.popitem() + weight_refs = [WeightingRef.from_promo(*promo) for promo in promos] + sbw = cls(srid, timeslot, weight_refs, is_srid=True) + sbw.set_as_latest() + + # Clear out expired ads + query = Subreddit._query(sort=db_ops.desc('_date'), data=False) + for subreddit in fetch_things2(query): + if subreddit._id not in all_weights: + cls.clear(subreddit, timeslot=timeslot) + + @classmethod + def clear(cls, srid, timeslot=None, is_srid=False): + timeslot = timeslot or datetime.datetime.now(g.tz) + cls(srid, timeslot, [], is_srid=is_srid).set_as_latest() + + @classmethod + def remove_link(cls, link_fn, from_subreddits, include_all_sr=True): + now = datetime.datetime.now(g.tz) + if include_all_sr: + srs = itertools.chain(from_subreddits, [cls.ID(cls.ALL_ADS_ID)]) + else: + srs = iter(from_subreddits) + for subreddit in srs: + current = cls.load_by_sr(subreddit._id) + updated = [r for r in current if r.data['link'] != link_fn] + cls(subreddit, now, updated).set_as_latest() + + def to_date(d): if isinstance(d, datetime.datetime): return d.date()