updated memcache.py to 1.43

This commit is contained in:
spez
2009-02-07 14:37:52 -08:00
parent 4e2cd5e316
commit 5b0bb8b88c
3 changed files with 273 additions and 158 deletions

View File

@@ -105,13 +105,13 @@ class Globals(object):
setattr(self, k, v)
# initialize caches
mc = Memcache(self.memcaches)
mc = Memcache(self.memcaches, pickleProtocol = 1)
self.cache = CacheChain((LocalCache(), mc))
self.permacache = Memcache(self.permacaches)
self.rendercache = Memcache(self.rendercaches)
self.permacache = Memcache(self.permacaches, pickleProtocol = 1)
self.rendercache = Memcache(self.rendercaches, pickleProtocol = 1)
self.make_lock = make_lock_factory(mc)
self.rec_cache = Memcache(self.rec_cache)
self.rec_cache = Memcache(self.rec_cache, pickleProtocol = 1)
# set default time zone if one is not set
self.tz = pytz.timezone(global_conf.get('timezone'))

View File

@@ -72,7 +72,7 @@ class Memcache(CacheUtils, memcache.Client):
memcache.Client.delete(self, key, time=time)
def delete_multi(self, keys, prefix='', time=0):
memcache.Client.delete_multi(self, keys, seconds = time,
memcache.Client.delete_multi(self, keys, time = time,
key_prefix = prefix)
class LocalCache(dict, CacheUtils):
@@ -119,11 +119,23 @@ class LocalCache(dict, CacheUtils):
def incr(self, key, amt=1):
if self.has_key(key):
self[key] += amt
self[key] = int(self[key]) + amt
def decr(self, key, amt=1):
if self.has_key(key):
self[key] -= amt
self[key] = int(self[key]) - amt
def append(self, key, val, time = 0):
if self.has_key(key):
self[key] = str(self[key]) + val
def prepend(self, key, val, time = 0):
if self.has_key(key):
self[key] = val + str(self[key])
def replace(self, key, val, time = 0):
if self.has_key(key):
self[key] = val
def flush_all(self):
self.clear()
@@ -139,6 +151,9 @@ class CacheChain(CacheUtils, local):
return fn
set = make_set_fn('set')
append = make_set_fn('append')
prepend = make_set_fn('prepend')
replace = make_set_fn('replace')
set_multi = make_set_fn('set_multi')
add = make_set_fn('add')
incr = make_set_fn('incr')

402
r2/r2/lib/contrib/memcache.py Normal file → Executable file
View File

@@ -46,8 +46,10 @@ More detailed documentation is available in the L{Client} class.
import sys
import socket
import time
import types
import os
from md5 import md5
import re
import types
try:
import cPickle as pickle
except ImportError:
@@ -62,11 +64,16 @@ except ImportError:
def decompress(val):
raise _Error("received compressed data but I don't support compession (import error)")
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from binascii import crc32 # zlib version is not cross-platform
serverHashFunction = crc32
__author__ = "Evan Martin <martine@danga.com>"
__version__ = "1.36"
__version__ = "1.43"
__copyright__ = "Copyright (C) 2003 Danga Interactive"
__license__ = "Python"
@@ -126,19 +133,35 @@ class Client(local):
class MemcachedStringEncodingError(Exception):
pass
def __init__(self, servers, debug=0):
def __init__(self, servers, debug=0, pickleProtocol=0,
pickler=pickle.Pickler, unpickler=pickle.Unpickler,
pload=None, pid=None):
"""
Create a new Client object with the given list of servers.
@param servers: C{servers} is passed to L{set_servers}.
@param debug: whether to display error messages when a server can't be
contacted.
@param pickleProtocol: number to mandate protocol used by (c)Pickle.
@param pickler: optional override of default Pickler to allow subclassing.
@param unpickler: optional override of default Unpickler to allow subclassing.
@param pload: optional persistent_load function to call on pickle loading.
Useful for cPickle since subclassing isn't allowed.
@param pid: optional persistent_id function to call on pickle storing.
Useful for cPickle since subclassing isn't allowed.
"""
local.__init__(self)
self.set_servers(servers)
self.debug = debug
self.stats = {}
# Allow users to modify pickling/unpickling behavior
self.pickleProtocol = pickleProtocol
self.pickler = pickler
self.unpickler = unpickler
self.persistent_load = pload
self.persistent_id = pid
def set_servers(self, servers):
"""
Set the pool of servers used by this client.
@@ -239,7 +262,7 @@ class Client(local):
for s in self.servers:
s.close_socket()
def delete_multi(self, keys, seconds=0, key_prefix=''):
def delete_multi(self, keys, time=0, key_prefix=''):
'''
Delete multiple keys in the memcache doing just one query.
@@ -257,7 +280,7 @@ class Client(local):
the next one.
@param keys: An iterable of keys to clear
@param seconds: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
@param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
@param key_prefix: Optional string to prepend to each key when sending to memcache.
See docs for L{get_multi} and L{set_multi}.
@@ -279,7 +302,7 @@ class Client(local):
write = bigcmd.append
if time != None:
for key in server_keys[server]: # These are mangled keys
write("delete %s %d\r\n" % (key, seconds))
write("delete %s %d\r\n" % (key, time))
else:
for key in server_keys[server]: # These are mangled keys
write("delete %s\r\n" % key)
@@ -287,7 +310,8 @@ class Client(local):
server.send_cmds(''.join(bigcmd))
except socket.error, msg:
rc = 0
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
dead_servers.append(server)
# if any servers died on the way, don't expect them to respond.
@@ -300,6 +324,7 @@ class Client(local):
for key in keys:
server.expect("DELETED")
except socket.error, msg:
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
rc = 0
return rc
@@ -308,7 +333,7 @@ class Client(local):
'''Deletes a key from the memcache.
@return: Nonzero on success.
@param seconds: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
@param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
@rtype: int
'''
server, key = self._get_server(key)
@@ -325,7 +350,8 @@ class Client(local):
server.send_cmd(cmd)
server.expect("DELETED")
except socket.error, msg:
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return 0
return 1
@@ -378,10 +404,11 @@ class Client(local):
line = server.readline()
return int(line)
except socket.error, msg:
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return None
def add(self, key, val, time=0):
def add(self, key, val, time = 0, min_compress_len = 0):
'''
Add new key with value.
@@ -390,7 +417,30 @@ class Client(local):
@return: Nonzero on success.
@rtype: int
'''
return self._set("add", key, val, time)
return self._set("add", key, val, time, min_compress_len)
def append(self, key, val, time=0, min_compress_len=0):
'''Append the value to the end of the existing key's value.
Only stores in memcache if key already exists.
Also see L{prepend}.
@return: Nonzero on success.
@rtype: int
'''
return self._set("append", key, val, time, min_compress_len)
def prepend(self, key, val, time=0, min_compress_len=0):
'''Prepend the value to the beginning of the existing key's value.
Only stores in memcache if key already exists.
Also see L{append}.
@return: Nonzero on success.
@rtype: int
'''
return self._set("prepend", key, val, time, min_compress_len)
def replace(self, key, val, time=0, min_compress_len=0):
'''Replace existing key with value.
@@ -401,14 +451,16 @@ class Client(local):
@rtype: int
'''
return self._set("replace", key, val, time, min_compress_len)
def set(self, key, val, time=0, min_compress_len=0):
'''Unconditionally sets a key to a given value in the memcache.
The C{key} can optionally be an tuple, with the first element being the
hash value, if you want to avoid making this module calculate a hash value.
You may prefer, for example, to keep all of a given user's objects on the
same memcache server, so you could use the user's unique id as the hash
value.
The C{key} can optionally be an tuple, with the first element
being the server hash value and the second being the key.
If you want to avoid making this module calculate a hash value.
You may prefer, for example, to keep all of a given user's objects
on the same memcache server, so you could use the user's unique
id as the hash value.
@return: Nonzero on success.
@rtype: int
@@ -530,13 +582,17 @@ class Client(local):
write("set %s %d %d %d\r\n%s\r\n" % (key, store_info[0], time, store_info[1], store_info[2]))
server.send_cmds(''.join(bigcmd))
except socket.error, msg:
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
dead_servers.append(server)
# if any servers died on the way, don't expect them to respond.
for server in dead_servers:
del server_keys[server]
# short-circuit if there are no servers, just return all keys
if not server_keys: return(mapping.keys())
notstored = [] # original keys.
for server, keys in server_keys.iteritems():
try:
@@ -547,11 +603,11 @@ class Client(local):
else:
notstored.append(prefixed_to_orig_key[key]) #un-mangle.
except (_Error, socket.error), msg:
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return notstored
@staticmethod
def _val_to_store_info(val, min_compress_len):
def _val_to_store_info(self, val, min_compress_len):
"""
Transform val to a storable representation, returning a tuple of the flags, the length of the new value, and the new value itself.
"""
@@ -570,7 +626,12 @@ class Client(local):
min_compress_len = 0
else:
flags |= Client._FLAG_PICKLE
val = pickle.dumps(val, 1) # Ack! JLR hacks it so that LinkedDict unpicling works w/o figuring out __reduce__.
file = StringIO()
pickler = self.pickler(file, protocol=self.pickleProtocol)
if self.persistent_id:
pickler.persistent_id = self.persistent_id
pickler.dump(val)
val = file.getvalue()
# silently do not store if value length exceeds maximum
if len(val) >= SERVER_MAX_VALUE_LENGTH:
@@ -587,7 +648,7 @@ class Client(local):
return (flags, len(val), val)
def _set(self, cmd, key, val, time, min_compress_len=0):
def _set(self, cmd, key, val, time, min_compress_len = 0):
server, key = self._get_server(key)
key = check_key(key)
if not server:
@@ -604,7 +665,8 @@ class Client(local):
server.send_cmd(fullcmd)
return(server.expect("STORED") == "STORED")
except socket.error, msg:
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return 0
def get(self, key):
@@ -627,8 +689,7 @@ class Client(local):
value = self._recv_value(server, flags, rlen)
server.expect("END")
except (_Error, socket.error), msg:
if type(msg) is types.TupleType:
msg = msg[1]
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return None
return value
@@ -682,7 +743,8 @@ class Client(local):
try:
server.send_cmd("get %s" % " ".join(server_keys[server]))
except socket.error, msg:
server.mark_dead(msg[1])
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
dead_servers.append(server)
# if any servers died on the way, don't expect them to respond.
@@ -701,8 +763,9 @@ class Client(local):
retvals[prefixed_to_orig_key[rkey]] = val # un-prefix returned key.
line = server.readline()
except (_Error, socket.error), msg:
server.mark_dead(msg)
if type(msg) is types.TupleType: msg = msg[1]
server.mark_dead(msg)
return retvals
def _expectvalue(self, server, line=None):
@@ -739,9 +802,13 @@ class Client(local):
val = long(buf)
elif flags & Client._FLAG_PICKLE:
try:
val = pickle.loads(buf)
except:
self.debuglog('Pickle error...\n')
file = StringIO(buf)
unpickler = self.unpickler(file)
if self.persistent_load:
unpickler.persistent_load = self.persistent_load
val = unpickler.load()
except Exception, e:
self.debuglog('Pickle error: %s\n' % e)
val = None
else:
self.debuglog("unknown flags on get: %x\n" % flags)
@@ -751,6 +818,7 @@ class Client(local):
class _Host:
_DEAD_RETRY = 1 # number of seconds before retrying a dead server.
_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout.
def __init__(self, host, debugfunc=None):
if isinstance(host, types.TupleType):
@@ -758,11 +826,24 @@ class _Host:
else:
self.weight = 1
if host.find(":") > 0:
self.ip, self.port = host.split(":")
self.port = int(self.port)
# parse the connection string
m = re.match(r'^(?P<proto>unix):(?P<path>.*)$', host)
if not m:
m = re.match(r'^(?P<proto>inet):'
r'(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host)
if not m: m = re.match(r'^(?P<host>[^:]+):(?P<port>[0-9]+)$', host)
if not m:
raise ValueError('Unable to parse connection string: "%s"' % host)
hostData = m.groupdict()
if hostData.get('proto') == 'unix':
self.family = socket.AF_UNIX
self.address = hostData['path']
else:
self.ip, self.port = host, 11211
self.family = socket.AF_INET
self.ip = hostData['host']
self.port = int(hostData.get('port', 11211))
self.address = ( self.ip, self.port )
if not debugfunc:
debugfunc = lambda x: x
@@ -794,11 +875,15 @@ class _Host:
return None
if self.socket:
return self.socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Python 2.3-ism: s.settimeout(1)
s = socket.socket(self.family, socket.SOCK_STREAM)
if hasattr(s, 'settimeout'): s.settimeout(self._SOCKET_TIMEOUT)
try:
s.connect((self.ip, self.port))
s.connect(self.address)
except socket.timeout, msg:
self.mark_dead("connect: %s" % msg)
return None
except socket.error, msg:
if type(msg) is types.TupleType: msg = msg[1]
self.mark_dead("connect: %s" % msg[1])
return None
self.socket = s
@@ -859,7 +944,11 @@ class _Host:
d = ''
if self.deaduntil:
d = " (dead until %d)" % self.deaduntil
return "%s:%d%s" % (self.ip, self.port, d)
if self.family == socket.AF_INET:
return "inet:%s:%d%s" % (self.address[0], self.address[1], d)
else:
return "unix:%s%s" % (self.address, d)
def check_key(key, key_extra_len=0):
"""Checks sanity of key. Fails if:
@@ -867,19 +956,12 @@ def check_key(key, key_extra_len=0):
Contains control characters (Raises MemcachedKeyCharacterError).
Is not a string (Raises MemcachedStringEncodingError)
"""
if type(key) == types.TupleType: key = key[1]
if not isinstance(key, str):
raise Client.MemcachedStringEncodingError, ("Keys must be str()'s, not"
"unicode. Convert your unicode strings using "
"mystring.encode(charset)!")
#if isinstance(key, basestring):
#if len(key) + key_extra_len > SERVER_MAX_KEY_LENGTH:
#raise Client.MemcachedKeyLengthError, ("Key length is > %s"
#% SERVER_MAX_KEY_LENGTH)
#for char in key:
#if ord(char) < 32:
#raise Client.MemcachedKeyCharacterError, "Control characters not allowed"
return md5(key).hexdigest()
def _doctest():
@@ -894,125 +976,143 @@ if __name__ == "__main__":
_doctest()
print "Running tests:"
print
#servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
servers = ["127.0.0.1:11211"]
mc = Client(servers, debug=1)
serverList = [["127.0.0.1:11211"]]
if '--do-unix' in sys.argv:
serverList.append([os.path.join(os.getcwd(), 'memcached.socket')])
def to_s(val):
if not isinstance(val, types.StringTypes):
return "%s (%s)" % (val, type(val))
return "%s" % val
def test_setget(key, val):
print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
mc.set(key, val)
newval = mc.get(key)
if newval == val:
print "OK"
return 1
else:
print "FAIL"
return 0
for servers in serverList:
mc = Client(servers, debug=1)
def to_s(val):
if not isinstance(val, types.StringTypes):
return "%s (%s)" % (val, type(val))
return "%s" % val
def test_setget(key, val):
print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
mc.set(key, val)
newval = mc.get(key)
if newval == val:
print "OK"
return 1
else:
print "FAIL"
return 0
class FooStruct:
def __init__(self):
self.bar = "baz"
def __str__(self):
return "A FooStruct"
def __eq__(self, other):
if isinstance(other, FooStruct):
return self.bar == other.bar
return 0
class FooStruct:
def __init__(self):
self.bar = "baz"
def __str__(self):
return "A FooStruct"
def __eq__(self, other):
if isinstance(other, FooStruct):
return self.bar == other.bar
return 0
test_setget("a_string", "some random string")
test_setget("an_integer", 42)
if test_setget("long", long(1<<30)):
print "Testing delete ...",
if mc.delete("long"):
test_setget("a_string", "some random string")
test_setget("an_integer", 42)
if test_setget("long", long(1<<30)):
print "Testing delete ...",
if mc.delete("long"):
print "OK"
else:
print "FAIL"
print "Testing get_multi ...",
print mc.get_multi(["a_string", "an_integer"])
print "Testing get(unknown value) ...",
print to_s(mc.get("unknown_value"))
f = FooStruct()
test_setget("foostruct", f)
print "Testing incr ...",
x = mc.incr("an_integer", 1)
if x == 43:
print "OK"
else:
print "FAIL"
print "Testing get_multi ...",
print mc.get_multi(["a_string", "an_integer"])
print "Testing get(unknown value) ...",
print to_s(mc.get("unknown_value"))
print "Testing decr ...",
x = mc.decr("an_integer", 1)
if x == 42:
print "OK"
else:
print "FAIL"
f = FooStruct()
test_setget("foostruct", f)
# sanity tests
print "Testing sending spaces...",
try:
x = mc.set("this has spaces", 1)
except Client.MemcachedKeyCharacterError, msg:
print "OK"
else:
print "FAIL"
print "Testing incr ...",
x = mc.incr("an_integer", 1)
if x == 43:
print "OK"
else:
print "FAIL"
print "Testing sending control characters...",
try:
x = mc.set("this\x10has\x11control characters\x02", 1)
except Client.MemcachedKeyCharacterError, msg:
print "OK"
else:
print "FAIL"
print "Testing decr ...",
x = mc.decr("an_integer", 1)
if x == 42:
print "OK"
else:
print "FAIL"
print "Testing using insanely long key...",
try:
x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'aaaa', 1)
except Client.MemcachedKeyLengthError, msg:
print "OK"
else:
print "FAIL"
# sanity tests
print "Testing sending spaces...",
try:
x = mc.set("this has spaces", 1)
except Client.MemcachedKeyCharacterError, msg:
print "OK"
else:
print "FAIL"
print "Testing sending a unicode-string key...",
try:
x = mc.set(u'keyhere', 1)
except Client.MemcachedStringEncodingError, msg:
print "OK",
else:
print "FAIL",
try:
x = mc.set((u'a'*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1)
except:
print "FAIL",
else:
print "OK",
import pickle
s = pickle.loads('V\\u4f1a\np0\n.')
try:
x = mc.set((s*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1)
except Client.MemcachedKeyLengthError:
print "OK"
else:
print "FAIL"
print "Testing sending control characters...",
try:
x = mc.set("this\x10has\x11control characters\x02", 1)
except Client.MemcachedKeyCharacterError, msg:
print "OK"
else:
print "FAIL"
print "Testing using a value larger than the memcached value limit...",
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH)
if mc.get('keyhere') == None:
print "OK",
else:
print "FAIL",
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa')
if mc.get('keyhere') == None:
print "OK"
else:
print "FAIL"
print "Testing using insanely long key...",
try:
x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'aaaa', 1)
except Client.MemcachedKeyLengthError, msg:
print "OK"
else:
print "FAIL"
print "Testing sending a unicode-string key...",
try:
x = mc.set(u'keyhere', 1)
except Client.MemcachedStringEncodingError, msg:
print "OK",
else:
print "FAIL",
try:
x = mc.set((u'a'*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1)
except:
print "FAIL",
else:
print "OK",
import pickle
s = pickle.loads('V\\u4f1a\np0\n.')
try:
x = mc.set((s*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1)
except Client.MemcachedKeyLengthError:
print "OK"
else:
print "FAIL"
print "Testing using a value larger than the memcached value limit...",
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH)
if mc.get('keyhere') == None:
print "OK",
else:
print "FAIL",
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa')
if mc.get('keyhere') == None:
print "OK"
else:
print "FAIL"
print "Testing set_multi() with no memcacheds running",
mc.disconnect_all()
errors = mc.set_multi({'keyhere' : 'a', 'keythere' : 'b'})
if errors != []:
print "FAIL"
else:
print "OK"
print "Testing delete_multi() with no memcacheds running",
mc.disconnect_all()
ret = mc.delete_multi({'keyhere' : 'a', 'keythere' : 'b'})
if ret != 1:
print "FAIL"
else:
print "OK"
# vim: ts=4 sw=4 et :