mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-01-26 23:39:11 -05:00
make_lock: Add Graphite tracking of lock wait times.
This commit deliberately does not muck with the keys used for locking so that we don't need a downtime to get these statistics.
This commit is contained in:
@@ -36,7 +36,7 @@ from reddit_base import RedditController
|
||||
|
||||
def get_blob(code):
|
||||
key = "payment_blob-" + code
|
||||
with g.make_lock("payment_blob_lock-" + code):
|
||||
with g.make_lock("payment_blob", "payment_blob_lock-" + code):
|
||||
blob = g.hardcache.get(key)
|
||||
if not blob:
|
||||
raise NotFound("No payment_blob-" + code)
|
||||
|
||||
@@ -1378,7 +1378,7 @@ class VDelay(Validator):
|
||||
prev_violations["duration"] = duration
|
||||
prev_violations["count"] += 1
|
||||
|
||||
with g.make_lock("lock-" + key, timeout=5, verbose=False):
|
||||
with g.make_lock("record_violation", "lock-" + key, timeout=5, verbose=False):
|
||||
existing = g.memcache.get(key)
|
||||
if existing and existing["count"] > prev_violations["count"]:
|
||||
g.log.warning("Tried to set %s to count=%d, but found existing=%d"
|
||||
|
||||
@@ -288,10 +288,8 @@ class Globals(object):
|
||||
self.live_config = extract_live_config(parser, self.plugins)
|
||||
self.throttles = tuple() # immutable since it's not real
|
||||
|
||||
self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients)
|
||||
self.make_lock = make_lock_factory(self.lock_cache)
|
||||
|
||||
self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients)
|
||||
self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients)
|
||||
|
||||
self.stats = Stats(self.config.get('statsd_addr'),
|
||||
self.config.get('statsd_sample_rate'))
|
||||
@@ -301,6 +299,8 @@ class Globals(object):
|
||||
event.listens_for(engine.Engine, 'after_cursor_execute')(
|
||||
self.stats.pg_after_cursor_execute)
|
||||
|
||||
self.make_lock = make_lock_factory(self.lock_cache, self.stats)
|
||||
|
||||
if not self.cassandra_seeds:
|
||||
raise ValueError("cassandra_seeds not set in the .ini")
|
||||
|
||||
|
||||
@@ -598,7 +598,7 @@ class CassandraCacheChain(CacheChain):
|
||||
|
||||
def mutate(self, key, mutation_fn, default = None, willread=True):
|
||||
"""Mutate a Cassandra key as atomically as possible"""
|
||||
with self.make_lock('mutate_%s' % key):
|
||||
with self.make_lock("permacache_mutate", 'mutate_%s' % key):
|
||||
# we have to do some of the the work of the cache chain
|
||||
# here so that we can be sure that if the value isn't in
|
||||
# memcached (an atomic store), we fetch it from Cassandra
|
||||
|
||||
@@ -56,7 +56,7 @@ def add_comments(comments):
|
||||
|
||||
for link_id, coms in link_map.iteritems():
|
||||
try:
|
||||
with g.make_lock(lock_key(link_id)):
|
||||
with g.make_lock("comment_tree", lock_key(link_id)):
|
||||
add_comments_nolock(link_id, coms)
|
||||
except:
|
||||
g.log.exception(
|
||||
@@ -154,7 +154,7 @@ def update_comment_votes(comments, write_consistency_level = None):
|
||||
write_consistency_level = write_consistency_level)
|
||||
|
||||
def delete_comment(comment):
|
||||
with g.make_lock(lock_key(comment.link_id)):
|
||||
with g.make_lock("comment_tree", lock_key(comment.link_id)):
|
||||
cids, comment_tree, depth, num_children = link_comments(comment.link_id)
|
||||
|
||||
# only completely remove comments with no children
|
||||
@@ -260,7 +260,7 @@ def link_comments_and_sort(link_id, sort):
|
||||
parents = {}
|
||||
|
||||
if not parents and len(cids) > 0:
|
||||
with g.make_lock(lock_key(link_id)):
|
||||
with g.make_lock("comment_tree", lock_key(link_id)):
|
||||
# reload from the cache so the sorter and parents are
|
||||
# maximally consistent
|
||||
r = g.permacache.get(comments_key(link_id))
|
||||
@@ -285,7 +285,7 @@ def link_comments(link_id, _update=False):
|
||||
# This operation can take longer than most (note the inner
|
||||
# locks) better to time out request temporarily than to deal
|
||||
# with an inconsistent tree
|
||||
with g.make_lock(lock_key(link_id), timeout=180):
|
||||
with g.make_lock("comment_tree", lock_key(link_id), timeout=180):
|
||||
r = _load_link_comments(link_id)
|
||||
# rebuild parent dict
|
||||
cids, cid_tree, depth, num_children, num_comments = r
|
||||
@@ -360,15 +360,15 @@ def messages_lock_key(user_id):
|
||||
|
||||
def add_message(message):
|
||||
# add the message to the author's list and the recipient
|
||||
with g.make_lock(messages_lock_key(message.author_id)):
|
||||
with g.make_lock("message_tree", messages_lock_key(message.author_id)):
|
||||
add_message_nolock(message.author_id, message)
|
||||
if message.to_id:
|
||||
with g.make_lock(messages_lock_key(message.to_id)):
|
||||
with g.make_lock("message_tree", messages_lock_key(message.to_id)):
|
||||
add_message_nolock(message.to_id, message)
|
||||
# Messages to a subreddit should end in its inbox. Messages
|
||||
# FROM a subreddit (currently, just ban messages) should NOT
|
||||
if message.sr_id and not message.from_sr:
|
||||
with g.make_lock(sr_messages_lock_key(message.sr_id)):
|
||||
with g.make_lock("modmail_tree", sr_messages_lock_key(message.sr_id)):
|
||||
add_sr_message_nolock(message.sr_id, message)
|
||||
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ class ThingMeta(type):
|
||||
extra_creation_arguments.update(creation_args)
|
||||
|
||||
log.warning("Creating Cassandra Column Family %s" % (cf_name,))
|
||||
with make_lock('cassandra_schema'):
|
||||
with make_lock("cassandra_schema", 'cassandra_schema'):
|
||||
manager.create_column_family(keyspace, cf_name,
|
||||
comparator_type = cls._compare_with,
|
||||
super=getattr(cls, '_super', False),
|
||||
|
||||
@@ -238,7 +238,7 @@ class DataThing(object):
|
||||
else:
|
||||
just_created = False
|
||||
|
||||
with g.make_lock('commit_' + self._fullname):
|
||||
with g.make_lock("thing_commit", 'commit_' + self._fullname):
|
||||
if not self._sync_latest():
|
||||
#sync'd and we have nothing to do now, but we still cache anyway
|
||||
self._cache_myself()
|
||||
@@ -327,7 +327,7 @@ class DataThing(object):
|
||||
(prop, self, self._int_props, self._data_int_props))
|
||||
raise ValueError, msg
|
||||
|
||||
with g.make_lock('commit_' + self._fullname):
|
||||
with g.make_lock("thing_commit", 'commit_' + self._fullname):
|
||||
self._sync_latest()
|
||||
old_val = getattr(self, prop)
|
||||
if self._defaults.has_key(prop) and self._defaults[prop] == old_val:
|
||||
@@ -1002,7 +1002,7 @@ class Query(object):
|
||||
# it's not in the cache, and we have the power to
|
||||
# update it, which we should do in a lock to prevent
|
||||
# concurrent requests for the same data
|
||||
with g.make_lock("lock_%s" % self._iden()):
|
||||
with g.make_lock("thing_query", "lock_%s" % self._iden()):
|
||||
# see if it was set while we were waiting for our
|
||||
# lock
|
||||
names = cache.get(self._iden(), allow_local = False) \
|
||||
|
||||
@@ -41,10 +41,13 @@ class MemcacheLock(object):
|
||||
attempt to grab a lock by 'adding' the lock name. If the response
|
||||
is True, we have the lock. If it's False, someone else has it."""
|
||||
|
||||
def __init__(self, key, cache, time = 30, timeout = 30, verbose=True):
|
||||
def __init__(self, stats, group, key, cache,
|
||||
time=30, timeout=30, verbose=True):
|
||||
# get a thread-local set of locks that we own
|
||||
self.locks = locks.locks = getattr(locks, 'locks', set())
|
||||
|
||||
self.stats = stats
|
||||
self.group = group
|
||||
self.key = key
|
||||
self.cache = cache
|
||||
self.time = time
|
||||
@@ -61,6 +64,9 @@ class MemcacheLock(object):
|
||||
if self.key in self.locks:
|
||||
return
|
||||
|
||||
timer = self.stats.get_timer("lock_wait")
|
||||
timer.start()
|
||||
|
||||
#try and fetch the lock, looping until it's available
|
||||
while not self.cache.add(self.key, my_info, time = self.time):
|
||||
if (datetime.now() - start).seconds > self.timeout:
|
||||
@@ -79,6 +85,8 @@ class MemcacheLock(object):
|
||||
|
||||
sleep(.01)
|
||||
|
||||
timer.stop(subname=self.group)
|
||||
|
||||
#tell this thread we have this lock so we can avoid deadlocks
|
||||
#of requests for the same lock in the same thread
|
||||
self.locks.add(self.key)
|
||||
@@ -90,7 +98,7 @@ class MemcacheLock(object):
|
||||
self.cache.delete(self.key)
|
||||
self.locks.remove(self.key)
|
||||
|
||||
def make_lock_factory(cache):
|
||||
def factory(key, **kw):
|
||||
return MemcacheLock(key, cache, **kw)
|
||||
def make_lock_factory(cache, stats):
|
||||
def factory(group, key, **kw):
|
||||
return MemcacheLock(stats, group, key, cache, **kw)
|
||||
return factory
|
||||
|
||||
@@ -45,7 +45,7 @@ def memoize(iden, time = 0, stale=False):
|
||||
|
||||
if res is None:
|
||||
# not cached, we should calculate it.
|
||||
with make_lock('memoize_lock(%s)' % key):
|
||||
with make_lock("memoize", 'memoize_lock(%s)' % key):
|
||||
# see if it was completed while we were waiting
|
||||
# for the lock
|
||||
stored = None if update else cache.get(key)
|
||||
|
||||
@@ -410,7 +410,7 @@ def new_campaign(link, dates, bid, sr):
|
||||
campaign = PromoCampaign._new(link, sr_name, bid, dates[0], dates[1])
|
||||
# note indx in link.campaigns is the Thing id now
|
||||
indx = campaign._id
|
||||
with g.make_lock(campaign_lock(link)):
|
||||
with g.make_lock("promo_campaign", campaign_lock(link)):
|
||||
# get a copy of the attr so that it'll be
|
||||
# marked as dirty on the next write.
|
||||
campaigns = getattr(link, "campaigns", {}).copy()
|
||||
@@ -449,7 +449,7 @@ def edit_campaign(link, campaign_id, dates, bid, sr):
|
||||
campaign.update(dates[0], dates[1], bid, sr_name, campaign.trans_id, commit=True)
|
||||
|
||||
# dual-write to link attribute in case we need to roll back
|
||||
with g.make_lock(campaign_lock(link)):
|
||||
with g.make_lock("promo_campaign", campaign_lock(link)):
|
||||
campaigns = getattr(link, 'campaigns', {}).copy()
|
||||
campaigns[campaign_id] = (dates[0], dates[1], bid, sr_name, campaign.trans_id)
|
||||
link.campaigns = campaigns
|
||||
@@ -480,7 +480,7 @@ def complimentary(username, value = True):
|
||||
a._commit()
|
||||
|
||||
def delete_campaign(link, index):
|
||||
with g.make_lock(campaign_lock(link)):
|
||||
with g.make_lock("promo_campaign", campaign_lock(link)):
|
||||
campaigns = getattr(link, "campaigns", {}).copy()
|
||||
if index in campaigns:
|
||||
PromotionWeights.delete_unfinished(link, index)
|
||||
@@ -557,7 +557,7 @@ def auth_campaign(link, campaign_id, user, pay_id):
|
||||
campaign._commit()
|
||||
|
||||
# dual-write update to link attribute in case we need to roll back
|
||||
with g.make_lock(campaign_lock(link)):
|
||||
with g.make_lock("promo_campaign", campaign_lock(link)):
|
||||
campaigns = getattr(link, "campaigns", {}).copy()
|
||||
if campaign_id in campaigns:
|
||||
campaigns[campaign_id] = (campaign.start_date, campaign.end_date,
|
||||
|
||||
@@ -97,7 +97,7 @@ class Subreddit(Thing, Printable):
|
||||
@classmethod
|
||||
def _new(cls, name, title, author_id, ip, lang = g.lang, type = 'public',
|
||||
over_18 = False, **kw):
|
||||
with g.make_lock('create_sr_' + name.lower()):
|
||||
with g.make_lock("create_sr", 'create_sr_' + name.lower()):
|
||||
try:
|
||||
sr = Subreddit._by_name(name)
|
||||
raise SubredditExists
|
||||
|
||||
Reference in New Issue
Block a user