mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-04-27 03:00:12 -04:00
Memcached: Implement size check on values
This should relieve the last of the common errors, which are due to trying to store cache values at sizes greater than 1MB.
This commit is contained in:
committed by
Brian Simpson
parent
4c6c4c1069
commit
9d7d9316a0
@@ -54,6 +54,8 @@ from r2.lib.cache import (
|
||||
Permacache,
|
||||
SelfEmptyingCache,
|
||||
StaleCacheChain,
|
||||
validate_size_error,
|
||||
validate_size_warn,
|
||||
)
|
||||
from r2.lib.configparse import ConfigValue, ConfigValueParser
|
||||
from r2.lib.contrib import ipaddress
|
||||
@@ -640,81 +642,101 @@ class Globals(object):
|
||||
|
||||
# the main memcache pool. used for most everything.
|
||||
memcache = CMemcache(
|
||||
"main",
|
||||
self.memcaches,
|
||||
min_compress_len=1400,
|
||||
num_clients=num_mc_clients,
|
||||
binary=True,
|
||||
validators=[validate_size_error],
|
||||
)
|
||||
|
||||
# a pool just used for @memoize results
|
||||
memoizecaches = CMemcache(
|
||||
"memoize",
|
||||
self.memoizecaches,
|
||||
min_compress_len=50 * 1024,
|
||||
num_clients=num_mc_clients,
|
||||
binary=True,
|
||||
validators=[validate_size_error],
|
||||
)
|
||||
|
||||
# a pool just for srmember rels
|
||||
srmembercaches = CMemcache(
|
||||
"srmember",
|
||||
self.srmembercaches,
|
||||
min_compress_len=96,
|
||||
num_clients=num_mc_clients,
|
||||
binary=True,
|
||||
validators=[validate_size_error],
|
||||
)
|
||||
|
||||
# a pool just for rels
|
||||
relcaches = CMemcache(
|
||||
"rel",
|
||||
self.relcaches,
|
||||
min_compress_len=96,
|
||||
num_clients=num_mc_clients,
|
||||
binary=True,
|
||||
validators=[validate_size_error],
|
||||
)
|
||||
|
||||
ratelimitcaches = CMemcache(
|
||||
"ratelimit",
|
||||
self.ratelimitcaches,
|
||||
min_compress_len=96,
|
||||
num_clients=num_mc_clients,
|
||||
validators=[validate_size_error],
|
||||
)
|
||||
|
||||
# a smaller pool of caches used only for distributed locks.
|
||||
# TODO: move this to ZooKeeper
|
||||
self.lock_cache = CMemcache(self.lockcaches,
|
||||
self.lock_cache = CMemcache("lock",
|
||||
self.lockcaches,
|
||||
binary=True,
|
||||
num_clients=num_mc_clients)
|
||||
num_clients=num_mc_clients,
|
||||
validators=[validate_size_error],)
|
||||
self.make_lock = make_lock_factory(self.lock_cache, self.stats)
|
||||
|
||||
# memcaches used in front of the permacache CF in cassandra.
|
||||
# XXX: this is a legacy thing; permacache was made when C* didn't have
|
||||
# a row cache.
|
||||
permacache_memcaches = CMemcache(self.permacache_memcaches,
|
||||
permacache_memcaches = CMemcache("perma",
|
||||
self.permacache_memcaches,
|
||||
min_compress_len=1400,
|
||||
num_clients=num_mc_clients)
|
||||
num_clients=num_mc_clients,
|
||||
validators=[validate_size_error],)
|
||||
|
||||
# the stalecache is a memcached local to the current app server used
|
||||
# for data that's frequently fetched but doesn't need to be fresh.
|
||||
if self.stalecaches:
|
||||
stalecaches = CMemcache(self.stalecaches,
|
||||
stalecaches = CMemcache("stale",
|
||||
self.stalecaches,
|
||||
binary=True,
|
||||
num_clients=num_mc_clients)
|
||||
num_clients=num_mc_clients,
|
||||
validators=[validate_size_error],)
|
||||
else:
|
||||
stalecaches = None
|
||||
|
||||
# rendercache holds rendered partial templates.
|
||||
rendercaches = CMemcache(
|
||||
"render",
|
||||
self.rendercaches,
|
||||
noreply=True,
|
||||
no_block=True,
|
||||
num_clients=num_mc_clients,
|
||||
min_compress_len=480,
|
||||
validators=[validate_size_warn],
|
||||
)
|
||||
|
||||
# pagecaches hold fully rendered pages
|
||||
pagecaches = CMemcache(
|
||||
"page",
|
||||
self.pagecaches,
|
||||
noreply=True,
|
||||
no_block=True,
|
||||
num_clients=num_mc_clients,
|
||||
min_compress_len=1400,
|
||||
validators=[validate_size_warn],
|
||||
)
|
||||
|
||||
self.startup_timer.intermediate("memcache")
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
# Inc. All Rights Reserved.
|
||||
###############################################################################
|
||||
|
||||
import sys
|
||||
from threading import local
|
||||
from hashlib import md5
|
||||
import cPickle as pickle
|
||||
@@ -70,8 +71,58 @@ class CacheUtils(object):
|
||||
|
||||
class MemcachedMaximumRetryException(Exception): pass
|
||||
|
||||
class MemcachedValueSizeException(Exception):
|
||||
def __init__(self, cache_name, caller, prefix, key, size):
|
||||
self.key = key
|
||||
self.size = size
|
||||
self.cache_name = cache_name
|
||||
self.caller = caller
|
||||
self.prefix = prefix
|
||||
|
||||
def __str__(self):
|
||||
return ("Memcached %s %s: The object for key '%s%s' is too big for memcached at %s bytes" %
|
||||
(self.cache_name, self.caller, self.prefix, self.key, self.size))
|
||||
|
||||
|
||||
# validation functions to be used by memcached pools
|
||||
MEMCACHED_MAX_VALUE_SIZE = 1024 * 1024 # 1MB
|
||||
|
||||
def validate_size_warn(**kwargs):
|
||||
if 'value' in kwargs:
|
||||
size = sys.getsizeof(kwargs["value"])
|
||||
if size >= MEMCACHED_MAX_VALUE_SIZE:
|
||||
key = ".".join((
|
||||
"memcached_large_object",
|
||||
kwargs.get("cache_name", "undefined")
|
||||
))
|
||||
g.stats.simple_event(key)
|
||||
g.log.debug(
|
||||
"Memcached %s: Attempted to cache an object > 1MB at key: '%s%s' of size %s bytes",
|
||||
kwargs.get("caller", "unknown"),
|
||||
kwargs.get("prefix", ""),
|
||||
kwargs.get("key", "undefined"),
|
||||
size
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def validate_size_error(**kwargs):
|
||||
if 'value' in kwargs:
|
||||
size = sys.getsizeof(kwargs["value"])
|
||||
if size >= MEMCACHED_MAX_VALUE_SIZE:
|
||||
raise MemcachedValueSizeException(
|
||||
kwargs.get("caller", "unknown"),
|
||||
kwargs.get("prefix", ""),
|
||||
kwargs.get("key", "undefined"),
|
||||
size
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
class CMemcache(CacheUtils):
|
||||
def __init__(self,
|
||||
name,
|
||||
servers,
|
||||
debug=False,
|
||||
noreply=False,
|
||||
@@ -79,10 +130,14 @@ class CMemcache(CacheUtils):
|
||||
min_compress_len=512 * 1024,
|
||||
num_clients=10,
|
||||
timeout_retry=5,
|
||||
binary=False):
|
||||
binary=False,
|
||||
validators=None):
|
||||
self.name = name
|
||||
self.servers = servers
|
||||
self.clients = pylibmc.ClientPool(n_slots = num_clients)
|
||||
self.timeout_retry = timeout_retry
|
||||
self.validators = validators or []
|
||||
|
||||
for x in xrange(num_clients):
|
||||
client = pylibmc.Client(servers, binary=binary)
|
||||
behaviors = {
|
||||
@@ -125,7 +180,18 @@ class CMemcache(CacheUtils):
|
||||
g.stats.event_count(event_name, "fail")
|
||||
raise MemcachedMaximumRetryException(ex)
|
||||
|
||||
def validate(self, **kwargs):
|
||||
kwargs['caller'] = sys._getframe().f_back.f_code.co_name
|
||||
kwargs['cache_name'] = self.name
|
||||
if not all(validator(**kwargs) for validator in self.validators):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get(self, key, default = None):
|
||||
if not self.validate(key=key):
|
||||
return default
|
||||
|
||||
def do_get():
|
||||
with self.clients.reserve() as mc:
|
||||
ret = mc.get(str(key))
|
||||
@@ -136,9 +202,12 @@ class CMemcache(CacheUtils):
|
||||
return self.retry(self.timeout_retry, do_get)
|
||||
|
||||
def get_multi(self, keys, prefix = ''):
|
||||
validated_keys = [k for k in (str(k) for k in keys)
|
||||
if self.validate(prefix=prefix, key=k)]
|
||||
|
||||
def do_get_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.get_multi(keys, key_prefix = prefix)
|
||||
return mc.get_multi(validated_keys, key_prefix = prefix)
|
||||
|
||||
return self.retry(self.timeout_retry, do_get_multi)
|
||||
|
||||
@@ -151,6 +220,9 @@ class CMemcache(CacheUtils):
|
||||
simple_get_multi = get_multi
|
||||
|
||||
def set(self, key, val, time = 0):
|
||||
if not self.validate(key=key, value=val):
|
||||
return None
|
||||
|
||||
def do_set():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set(str(key), val, time=time,
|
||||
@@ -159,42 +231,55 @@ class CMemcache(CacheUtils):
|
||||
return self.retry(self.timeout_retry, do_set)
|
||||
|
||||
def set_multi(self, keys, prefix='', time=0):
|
||||
new_keys = {}
|
||||
for k,v in keys.iteritems():
|
||||
new_keys[str(k)] = v
|
||||
str_keys = ((str(k), v) for k, v in keys.iteritems())
|
||||
validated_keys = {k: v for k, v in str_keys
|
||||
if self.validate(prefix=prefix, key=k, value=v)}
|
||||
|
||||
def do_set_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.set_multi(new_keys, key_prefix = prefix,
|
||||
return mc.set_multi(validated_keys, key_prefix = prefix,
|
||||
time = time,
|
||||
min_compress_len = self.min_compress_len)
|
||||
|
||||
return self.retry(self.timeout_retry, do_set_multi)
|
||||
|
||||
def add_multi(self, keys, prefix='', time=0):
|
||||
new_keys = {}
|
||||
for k,v in keys.iteritems():
|
||||
new_keys[str(k)] = v
|
||||
str_keys = ((str(k), v) for k, v in keys.iteritems())
|
||||
validated_keys = {k: v for k, v in str_keys
|
||||
if self.validate(prefix=prefix, key=k, value=v)}
|
||||
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.add_multi(new_keys, key_prefix = prefix,
|
||||
return mc.add_multi(validated_keys, key_prefix = prefix,
|
||||
time = time)
|
||||
|
||||
def incr_multi(self, keys, prefix='', delta=1):
|
||||
validated_keys = [k for k in (str(k) for k in keys)
|
||||
if self.validate(prefix=prefix, key=k)]
|
||||
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.incr_multi(map(str, keys),
|
||||
return mc.incr_multi(validated_keys,
|
||||
key_prefix = prefix,
|
||||
delta=delta)
|
||||
|
||||
def append(self, key, val, time=0):
|
||||
if not self.validate(key=key, value=val):
|
||||
return None
|
||||
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.append(str(key), val, time=time)
|
||||
|
||||
def incr(self, key, delta=1, time=0):
|
||||
if not self.validate(key=key):
|
||||
return None
|
||||
|
||||
# ignore the time on these
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.incr(str(key), delta)
|
||||
|
||||
def add(self, key, val, time=0):
|
||||
if not self.validate(key=key, value=val):
|
||||
return None
|
||||
|
||||
try:
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.add(str(key), val, time=time)
|
||||
@@ -202,6 +287,9 @@ class CMemcache(CacheUtils):
|
||||
return None
|
||||
|
||||
def delete(self, key, time=0):
|
||||
if not self.validate(key=key):
|
||||
return None
|
||||
|
||||
def do_delete():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete(str(key))
|
||||
@@ -209,9 +297,12 @@ class CMemcache(CacheUtils):
|
||||
return self.retry(self.timeout_retry, do_delete)
|
||||
|
||||
def delete_multi(self, keys, prefix=''):
|
||||
validated_keys = [k for k in (str(k) for k in keys)
|
||||
if self.validate(prefix=prefix, key=k)]
|
||||
|
||||
def do_delete_multi():
|
||||
with self.clients.reserve() as mc:
|
||||
return mc.delete_multi(keys, key_prefix=prefix)
|
||||
return mc.delete_multi(validated_keys, key_prefix=prefix)
|
||||
|
||||
return self.retry(self.timeout_retry, do_delete_multi)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user