diff --git a/r2/r2/controllers/ipn.py b/r2/r2/controllers/ipn.py index 97a6510ef..a39c7152d 100644 --- a/r2/r2/controllers/ipn.py +++ b/r2/r2/controllers/ipn.py @@ -36,7 +36,7 @@ from reddit_base import RedditController def get_blob(code): key = "payment_blob-" + code - with g.make_lock("payment_blob_lock-" + code): + with g.make_lock("payment_blob", "payment_blob_lock-" + code): blob = g.hardcache.get(key) if not blob: raise NotFound("No payment_blob-" + code) diff --git a/r2/r2/controllers/validator/validator.py b/r2/r2/controllers/validator/validator.py index 91ce5dbdf..7ecc2bc51 100644 --- a/r2/r2/controllers/validator/validator.py +++ b/r2/r2/controllers/validator/validator.py @@ -1378,7 +1378,7 @@ class VDelay(Validator): prev_violations["duration"] = duration prev_violations["count"] += 1 - with g.make_lock("lock-" + key, timeout=5, verbose=False): + with g.make_lock("record_violation", "lock-" + key, timeout=5, verbose=False): existing = g.memcache.get(key) if existing and existing["count"] > prev_violations["count"]: g.log.warning("Tried to set %s to count=%d, but found existing=%d" diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index ca5f4b7cc..0c2d891fc 100755 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -288,10 +288,8 @@ class Globals(object): self.live_config = extract_live_config(parser, self.plugins) self.throttles = tuple() # immutable since it's not real - self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients) - self.make_lock = make_lock_factory(self.lock_cache) - self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients) + self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients) self.stats = Stats(self.config.get('statsd_addr'), self.config.get('statsd_sample_rate')) @@ -301,6 +299,8 @@ class Globals(object): event.listens_for(engine.Engine, 'after_cursor_execute')( self.stats.pg_after_cursor_execute) + self.make_lock = make_lock_factory(self.lock_cache, self.stats) + if not self.cassandra_seeds: raise ValueError("cassandra_seeds not set in the .ini") diff --git a/r2/r2/lib/cache.py b/r2/r2/lib/cache.py index 0d0b0691a..f87cefef2 100644 --- a/r2/r2/lib/cache.py +++ b/r2/r2/lib/cache.py @@ -598,7 +598,7 @@ class CassandraCacheChain(CacheChain): def mutate(self, key, mutation_fn, default = None, willread=True): """Mutate a Cassandra key as atomically as possible""" - with self.make_lock('mutate_%s' % key): + with self.make_lock("permacache_mutate", 'mutate_%s' % key): # we have to do some of the the work of the cache chain # here so that we can be sure that if the value isn't in # memcached (an atomic store), we fetch it from Cassandra diff --git a/r2/r2/lib/comment_tree.py b/r2/r2/lib/comment_tree.py index 8d2d0f07c..d3e622ba5 100755 --- a/r2/r2/lib/comment_tree.py +++ b/r2/r2/lib/comment_tree.py @@ -56,7 +56,7 @@ def add_comments(comments): for link_id, coms in link_map.iteritems(): try: - with g.make_lock(lock_key(link_id)): + with g.make_lock("comment_tree", lock_key(link_id)): add_comments_nolock(link_id, coms) except: g.log.exception( @@ -154,7 +154,7 @@ def update_comment_votes(comments, write_consistency_level = None): write_consistency_level = write_consistency_level) def delete_comment(comment): - with g.make_lock(lock_key(comment.link_id)): + with g.make_lock("comment_tree", lock_key(comment.link_id)): cids, comment_tree, depth, num_children = link_comments(comment.link_id) # only completely remove comments with no children @@ -260,7 +260,7 @@ def link_comments_and_sort(link_id, sort): parents = {} if not parents and len(cids) > 0: - with g.make_lock(lock_key(link_id)): + with g.make_lock("comment_tree", lock_key(link_id)): # reload from the cache so the sorter and parents are # maximally consistent r = g.permacache.get(comments_key(link_id)) @@ -285,7 +285,7 @@ def link_comments(link_id, _update=False): # This operation can take longer than most (note the inner # locks) better to time out request temporarily than to deal # with an inconsistent tree - with g.make_lock(lock_key(link_id), timeout=180): + with g.make_lock("comment_tree", lock_key(link_id), timeout=180): r = _load_link_comments(link_id) # rebuild parent dict cids, cid_tree, depth, num_children, num_comments = r @@ -360,15 +360,15 @@ def messages_lock_key(user_id): def add_message(message): # add the message to the author's list and the recipient - with g.make_lock(messages_lock_key(message.author_id)): + with g.make_lock("message_tree", messages_lock_key(message.author_id)): add_message_nolock(message.author_id, message) if message.to_id: - with g.make_lock(messages_lock_key(message.to_id)): + with g.make_lock("message_tree", messages_lock_key(message.to_id)): add_message_nolock(message.to_id, message) # Messages to a subreddit should end in its inbox. Messages # FROM a subreddit (currently, just ban messages) should NOT if message.sr_id and not message.from_sr: - with g.make_lock(sr_messages_lock_key(message.sr_id)): + with g.make_lock("modmail_tree", sr_messages_lock_key(message.sr_id)): add_sr_message_nolock(message.sr_id, message) diff --git a/r2/r2/lib/db/tdb_cassandra.py b/r2/r2/lib/db/tdb_cassandra.py index 6b0134f2b..f575c131b 100644 --- a/r2/r2/lib/db/tdb_cassandra.py +++ b/r2/r2/lib/db/tdb_cassandra.py @@ -160,7 +160,7 @@ class ThingMeta(type): extra_creation_arguments.update(creation_args) log.warning("Creating Cassandra Column Family %s" % (cf_name,)) - with make_lock('cassandra_schema'): + with make_lock("cassandra_schema", 'cassandra_schema'): manager.create_column_family(keyspace, cf_name, comparator_type = cls._compare_with, super=getattr(cls, '_super', False), diff --git a/r2/r2/lib/db/thing.py b/r2/r2/lib/db/thing.py index 19fbe1757..c794c177a 100644 --- a/r2/r2/lib/db/thing.py +++ b/r2/r2/lib/db/thing.py @@ -238,7 +238,7 @@ class DataThing(object): else: just_created = False - with g.make_lock('commit_' + self._fullname): + with g.make_lock("thing_commit", 'commit_' + self._fullname): if not self._sync_latest(): #sync'd and we have nothing to do now, but we still cache anyway self._cache_myself() @@ -327,7 +327,7 @@ class DataThing(object): (prop, self, self._int_props, self._data_int_props)) raise ValueError, msg - with g.make_lock('commit_' + self._fullname): + with g.make_lock("thing_commit", 'commit_' + self._fullname): self._sync_latest() old_val = getattr(self, prop) if self._defaults.has_key(prop) and self._defaults[prop] == old_val: @@ -1002,7 +1002,7 @@ class Query(object): # it's not in the cache, and we have the power to # update it, which we should do in a lock to prevent # concurrent requests for the same data - with g.make_lock("lock_%s" % self._iden()): + with g.make_lock("thing_query", "lock_%s" % self._iden()): # see if it was set while we were waiting for our # lock names = cache.get(self._iden(), allow_local = False) \ diff --git a/r2/r2/lib/lock.py b/r2/r2/lib/lock.py index 40682052b..dabf812b4 100644 --- a/r2/r2/lib/lock.py +++ b/r2/r2/lib/lock.py @@ -41,10 +41,13 @@ class MemcacheLock(object): attempt to grab a lock by 'adding' the lock name. If the response is True, we have the lock. If it's False, someone else has it.""" - def __init__(self, key, cache, time = 30, timeout = 30, verbose=True): + def __init__(self, stats, group, key, cache, + time=30, timeout=30, verbose=True): # get a thread-local set of locks that we own self.locks = locks.locks = getattr(locks, 'locks', set()) + self.stats = stats + self.group = group self.key = key self.cache = cache self.time = time @@ -61,6 +64,9 @@ class MemcacheLock(object): if self.key in self.locks: return + timer = self.stats.get_timer("lock_wait") + timer.start() + #try and fetch the lock, looping until it's available while not self.cache.add(self.key, my_info, time = self.time): if (datetime.now() - start).seconds > self.timeout: @@ -79,6 +85,8 @@ class MemcacheLock(object): sleep(.01) + timer.stop(subname=self.group) + #tell this thread we have this lock so we can avoid deadlocks #of requests for the same lock in the same thread self.locks.add(self.key) @@ -90,7 +98,7 @@ class MemcacheLock(object): self.cache.delete(self.key) self.locks.remove(self.key) -def make_lock_factory(cache): - def factory(key, **kw): - return MemcacheLock(key, cache, **kw) +def make_lock_factory(cache, stats): + def factory(group, key, **kw): + return MemcacheLock(stats, group, key, cache, **kw) return factory diff --git a/r2/r2/lib/memoize.py b/r2/r2/lib/memoize.py index 4b08249a1..f9056bc67 100644 --- a/r2/r2/lib/memoize.py +++ b/r2/r2/lib/memoize.py @@ -45,7 +45,7 @@ def memoize(iden, time = 0, stale=False): if res is None: # not cached, we should calculate it. - with make_lock('memoize_lock(%s)' % key): + with make_lock("memoize", 'memoize_lock(%s)' % key): # see if it was completed while we were waiting # for the lock stored = None if update else cache.get(key) diff --git a/r2/r2/lib/promote.py b/r2/r2/lib/promote.py index b5e280e1a..6967f74d3 100644 --- a/r2/r2/lib/promote.py +++ b/r2/r2/lib/promote.py @@ -410,7 +410,7 @@ def new_campaign(link, dates, bid, sr): campaign = PromoCampaign._new(link, sr_name, bid, dates[0], dates[1]) # note indx in link.campaigns is the Thing id now indx = campaign._id - with g.make_lock(campaign_lock(link)): + with g.make_lock("promo_campaign", campaign_lock(link)): # get a copy of the attr so that it'll be # marked as dirty on the next write. campaigns = getattr(link, "campaigns", {}).copy() @@ -449,7 +449,7 @@ def edit_campaign(link, campaign_id, dates, bid, sr): campaign.update(dates[0], dates[1], bid, sr_name, campaign.trans_id, commit=True) # dual-write to link attribute in case we need to roll back - with g.make_lock(campaign_lock(link)): + with g.make_lock("promo_campaign", campaign_lock(link)): campaigns = getattr(link, 'campaigns', {}).copy() campaigns[campaign_id] = (dates[0], dates[1], bid, sr_name, campaign.trans_id) link.campaigns = campaigns @@ -480,7 +480,7 @@ def complimentary(username, value = True): a._commit() def delete_campaign(link, index): - with g.make_lock(campaign_lock(link)): + with g.make_lock("promo_campaign", campaign_lock(link)): campaigns = getattr(link, "campaigns", {}).copy() if index in campaigns: PromotionWeights.delete_unfinished(link, index) @@ -557,7 +557,7 @@ def auth_campaign(link, campaign_id, user, pay_id): campaign._commit() # dual-write update to link attribute in case we need to roll back - with g.make_lock(campaign_lock(link)): + with g.make_lock("promo_campaign", campaign_lock(link)): campaigns = getattr(link, "campaigns", {}).copy() if campaign_id in campaigns: campaigns[campaign_id] = (campaign.start_date, campaign.end_date, diff --git a/r2/r2/models/subreddit.py b/r2/r2/models/subreddit.py index c637ccae7..c59ccab8c 100644 --- a/r2/r2/models/subreddit.py +++ b/r2/r2/models/subreddit.py @@ -97,7 +97,7 @@ class Subreddit(Thing, Printable): @classmethod def _new(cls, name, title, author_id, ip, lang = g.lang, type = 'public', over_18 = False, **kw): - with g.make_lock('create_sr_' + name.lower()): + with g.make_lock("create_sr", 'create_sr_' + name.lower()): try: sr = Subreddit._by_name(name) raise SubredditExists