mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-04-27 03:00:12 -04:00
Memcached: Implement retry on failure.
This commit is contained in:
committed by
Neil Williams
parent
19c269a14c
commit
d6a74a0164
@@ -25,6 +25,8 @@ from hashlib import md5
|
||||
import cPickle as pickle
|
||||
from copy import copy
|
||||
|
||||
from pylons import g
|
||||
|
||||
import pylibmc
|
||||
from _pylibmc import MemcachedError
|
||||
|
||||
@@ -37,6 +39,7 @@ from r2.lib.hardcachebackend import HardCacheBackend
|
||||
|
||||
from r2.lib.sgm import sgm # get this into our namespace so that it's
|
||||
# importable from us
|
||||
from r2.config import feature
|
||||
|
||||
# This is for use in the health controller
|
||||
_CACHE_SERVERS = set()
|
||||
@@ -62,6 +65,9 @@ class CacheUtils(object):
|
||||
def get_multi(self, keys, prefix='', **kw):
|
||||
return prefix_keys(keys, prefix, lambda k: self.simple_get_multi(k, **kw))
|
||||
|
||||
|
||||
class MemcachedMaximumRetryException(Exception): pass
|
||||
|
||||
class CMemcache(CacheUtils):
|
||||
def __init__(self,
|
||||
servers,
|
||||
@@ -70,9 +76,11 @@ class CMemcache(CacheUtils):
|
||||
no_block=False,
|
||||
min_compress_len=512 * 1024,
|
||||
num_clients=10,
|
||||
timeout_retry=5,
|
||||
binary=False):
|
||||
self.servers = servers
|
||||
self.clients = pylibmc.ClientPool(n_slots = num_clients)
|
||||
self.timeout_retry = timeout_retry
|
||||
for x in xrange(num_clients):
|
||||
client = pylibmc.Client(servers, binary=binary)
|
||||
behaviors = {
|
||||
@@ -91,16 +99,43 @@ class CMemcache(CacheUtils):
|
||||
|
||||
_CACHE_SERVERS.update(servers)
|
||||
|
||||
|
||||
def retry(self, times, fn):
|
||||
if feature.is_enabled('memcached_retry'):
|
||||
ex = None
|
||||
for i in xrange(times):
|
||||
try:
|
||||
return fn()
|
||||
except (pylibmc.NotFound,
|
||||
pylibmc.BadKeyProvided,
|
||||
pylibmc.UnknownStatKey,
|
||||
pylibmc.InvalidHostProtocolError,
|
||||
pylibmc.NotSupportedError):
|
||||
raise
|
||||
except MemcachedError as e:
|
||||
ex = e
|
||||
g.log.info('Memcached error, retrying: %r', e)
|
||||
|
||||
raise MemcachedMaximumRetryException(ex)
|
||||
else:
|
||||
return fn()
|
||||
|
||||
def get(self, key, default = None):
|
||||
with self.clients.reserve() as mc:
|
||||
ret = mc.get(key)
|
||||
if ret is None:
|
||||
return default
|
||||
return ret
|
||||
def do_get():
|
||||
with self.clients.reserve() as mc:
|
||||
ret = mc.get(key)
|
||||
if ret is None:
|
||||
return default
|
||||
return ret
|
||||
|
||||
return self.retry(self.timeout_retry, do_get)
|
||||
|
||||
def get_multi(self, keys, prefix = ''):
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.get_multi(keys, key_prefix = prefix)
|
||||
def do_get_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.get_multi(keys, key_prefix = prefix)
|
||||
|
||||
return self.retry(self.timeout_retry, do_get_multi)
|
||||
|
||||
# simple_get_multi exists so that a cache chain can
|
||||
# single-instance the handling of prefixes for performance, but
|
||||
@@ -111,18 +146,25 @@ class CMemcache(CacheUtils):
|
||||
simple_get_multi = get_multi
|
||||
|
||||
def set(self, key, val, time = 0):
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set(key, val, time = time,
|
||||
min_compress_len = self.min_compress_len)
|
||||
def do_set():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set(key, val, time = time,
|
||||
min_compress_len = self.min_compress_len)
|
||||
|
||||
return self.retry(self.timeout_retry, do_set)
|
||||
|
||||
def set_multi(self, keys, prefix='', time=0):
|
||||
new_keys = {}
|
||||
for k,v in keys.iteritems():
|
||||
new_keys[str(k)] = v
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set_multi(new_keys, key_prefix = prefix,
|
||||
time = time,
|
||||
min_compress_len = self.min_compress_len)
|
||||
|
||||
def do_set_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set_multi(new_keys, key_prefix = prefix,
|
||||
time = time,
|
||||
min_compress_len = self.min_compress_len)
|
||||
|
||||
return self.retry(self.timeout_retry, do_set_multi)
|
||||
|
||||
def add_multi(self, keys, prefix='', time=0):
|
||||
new_keys = {}
|
||||
@@ -155,12 +197,18 @@ class CMemcache(CacheUtils):
|
||||
return None
|
||||
|
||||
def delete(self, key, time=0):
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete(key)
|
||||
def do_delete():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete(key)
|
||||
|
||||
return self.retry(self.timeout_retry, do_delete)
|
||||
|
||||
def delete_multi(self, keys, prefix=''):
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete_multi(keys, key_prefix=prefix)
|
||||
def do_delete_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete_multi(keys, key_prefix=prefix)
|
||||
|
||||
return self.retry(self.timeout_retry, do_delete_multi)
|
||||
|
||||
def __repr__(self):
|
||||
return '<%s(%r)>' % (self.__class__.__name__,
|
||||
|
||||
Reference in New Issue
Block a user