diff --git a/r2/r2/lib/promote.py b/r2/r2/lib/promote.py index ae7312168..7f7121a51 100644 --- a/r2/r2/lib/promote.py +++ b/r2/r2/lib/promote.py @@ -692,7 +692,7 @@ def get_live_promotions(srids, from_permacache=True): return links, weights -def set_live_promotions(links, weights, which=("permacache",)): +def set_live_promotions(links, weights, which=("cass", "permacache")): if "permacache" in which: timer = g.stats.get_timer("promote.set_live.permacache") timer.start() @@ -997,8 +997,8 @@ def run_changed(drain=False, limit=100, sleep_time=10, verbose=True): if QUEUE_ALL in items: # QUEUE_ALL is just an indicator to run make_daily_promotions. # There's no promotion log to update in this case. - print "Received QUEUE_ALL message" - items.remove(QUEUE_ALL) + print "Received %s QUEUE_ALL message(s)" % items.count(QUEUE_ALL) + items = [i for i in items if i != QUEUE_ALL] make_daily_promotions() links = Link._by_fullname([i["link"] for i in items]) for item in items: diff --git a/r2/r2/models/bidding.py b/r2/r2/models/bidding.py index ad8462ec6..0720eae26 100644 --- a/r2/r2/models/bidding.py +++ b/r2/r2/models/bidding.py @@ -36,14 +36,14 @@ from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.dialects.postgresql.base import PGInet as Inet from sqlalchemy.ext.declarative import declarative_base -import r2.lib.db.operators as db_ops +from r2.lib.utils import Enum +from r2.models.account import Account +from r2.models import Link, Subreddit from r2.lib.db.thing import Thing, NotFound import r2.lib.db.tdb_cassandra as tdb_cassandra from r2.lib.memoize import memoize from r2.lib.utils import Enum, fetch_things2, to_date -from r2.models.account import Account -from r2.models import Link, Subreddit engine = g.dbm.get_engine('authorize') @@ -528,10 +528,14 @@ class SponsorBoxWeightings(object): def _id(self): return int(self) - def __init__(self, subreddit, timestamp, items, is_srid=True): + def __init__(self, subreddit, timestamp, items, is_srid=True, + mutator=None): self.subreddit = self.ID(subreddit) if is_srid else subreddit self.items = items self.timeslot = tdb_cassandra.date_serializer.pack(timestamp) + # If a Mutator is not passed in, the ColumnFamily has a compatible + # insert() method + self.mutator = mutator or self._cf @classmethod def index_rowkey(cls, subreddit): @@ -576,15 +580,15 @@ class SponsorBoxWeightings(object): @tdb_cassandra.will_write def _set_as_latest(self): rowkey = self.rowkey() if not self.empty else '' - self._cf.insert(self.index_rowkey(self.subreddit), - {self._IDX_COLUMN_KEY: rowkey}, - ttl=self._ttl) + self.mutator.insert(self.index_rowkey(self.subreddit), + {self._IDX_COLUMN_KEY: rowkey}, + ttl=self._ttl) @tdb_cassandra.will_write def set_timeslots(self): campaign_weights = self._calculate_weights() if campaign_weights: - self._cf.insert(self.rowkey(), campaign_weights, ttl=self._ttl) + self.mutator.insert(self.rowkey(), campaign_weights, ttl=self._ttl) def _calculate_weights(self): # Distribute the ads as "slices" of the range 0 - 2^32 @@ -613,36 +617,36 @@ class SponsorBoxWeightings(object): if '' in weights: weights[cls.FRONT_PAGE] = weights.pop('') - timeslot = datetime.datetime.now(g.tz) + today = datetime.datetime.now(g.tz) while weights: srid, promos = weights.popitem() weight_refs = [WeightingRef.from_promo(*promo) for promo in promos] - sbw = cls(srid, timeslot, weight_refs, is_srid=True) + sbw = cls(srid, today, weight_refs, is_srid=True) sbw.set_as_latest() # Clear out expired ads - query = Subreddit._query(sort=db_ops.desc('_date'), data=False) - for subreddit in fetch_things2(query): - if subreddit._id not in all_weights: - cls.clear(subreddit, timeslot=timeslot) + + # First, figure out which subreddits may have had ads + yesterday = today - datetime.timedelta(days=1) + tomorrow = today + datetime.timedelta(days=1) + promo_weights = PromotionWeights.get_campaigns(yesterday, tomorrow) + subreddit_names = set(p.sr_name for p in promo_weights) + subreddits = Subreddit._by_name(subreddit_names).values() + + # Next, clear out any ads from those subreddits, if and only if + # that subreddit wasn't updated already during this function call + mutator = cls._cf.batch() + for sr in subreddits: + if sr._id not in all_weights: + cls.clear(sr, timeslot=today, mutator=mutator) + mutator.send() @classmethod - def clear(cls, srid, timeslot=None, is_srid=False): + def clear(cls, srid, timeslot=None, is_srid=False, mutator=None): timeslot = timeslot or datetime.datetime.now(g.tz) - cls(srid, timeslot, [], is_srid=is_srid).set_as_latest() - - @classmethod - def remove_link(cls, link_fn, from_subreddits, include_all_sr=True): - now = datetime.datetime.now(g.tz) - if include_all_sr: - srs = itertools.chain(from_subreddits, [cls.ID(cls.ALL_ADS_ID)]) - else: - srs = from_subreddits - for subreddit in srs: - current = cls.load_by_sr(subreddit._id) - updated = [r for r in current if r.data['link'] != link_fn] - cls(subreddit, now, updated).set_as_latest() + sbw = cls(srid, timeslot, [], is_srid=is_srid, mutator=mutator) + sbw.set_as_latest() # do all the leg work of creating/connecting to tables