From 5b0bb8b88cb813d5d93685070feaaece31252c76 Mon Sep 17 00:00:00 2001 From: spez Date: Sat, 7 Feb 2009 14:37:52 -0800 Subject: [PATCH] updated memcache.py to 1.43 --- r2/r2/lib/app_globals.py | 8 +- r2/r2/lib/cache.py | 21 +- r2/r2/lib/contrib/memcache.py | 402 +++++++++++++++++++++------------- 3 files changed, 273 insertions(+), 158 deletions(-) mode change 100644 => 100755 r2/r2/lib/contrib/memcache.py diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index 76b032cd6..f488b10c9 100644 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -105,13 +105,13 @@ class Globals(object): setattr(self, k, v) # initialize caches - mc = Memcache(self.memcaches) + mc = Memcache(self.memcaches, pickleProtocol = 1) self.cache = CacheChain((LocalCache(), mc)) - self.permacache = Memcache(self.permacaches) - self.rendercache = Memcache(self.rendercaches) + self.permacache = Memcache(self.permacaches, pickleProtocol = 1) + self.rendercache = Memcache(self.rendercaches, pickleProtocol = 1) self.make_lock = make_lock_factory(mc) - self.rec_cache = Memcache(self.rec_cache) + self.rec_cache = Memcache(self.rec_cache, pickleProtocol = 1) # set default time zone if one is not set self.tz = pytz.timezone(global_conf.get('timezone')) diff --git a/r2/r2/lib/cache.py b/r2/r2/lib/cache.py index 4f52e8121..bbc597bcf 100644 --- a/r2/r2/lib/cache.py +++ b/r2/r2/lib/cache.py @@ -72,7 +72,7 @@ class Memcache(CacheUtils, memcache.Client): memcache.Client.delete(self, key, time=time) def delete_multi(self, keys, prefix='', time=0): - memcache.Client.delete_multi(self, keys, seconds = time, + memcache.Client.delete_multi(self, keys, time = time, key_prefix = prefix) class LocalCache(dict, CacheUtils): @@ -119,11 +119,23 @@ class LocalCache(dict, CacheUtils): def incr(self, key, amt=1): if self.has_key(key): - self[key] += amt + self[key] = int(self[key]) + amt def decr(self, key, amt=1): if self.has_key(key): - self[key] -= amt + self[key] = int(self[key]) - amt + + def append(self, key, val, time = 0): + if self.has_key(key): + self[key] = str(self[key]) + val + + def prepend(self, key, val, time = 0): + if self.has_key(key): + self[key] = val + str(self[key]) + + def replace(self, key, val, time = 0): + if self.has_key(key): + self[key] = val def flush_all(self): self.clear() @@ -139,6 +151,9 @@ class CacheChain(CacheUtils, local): return fn set = make_set_fn('set') + append = make_set_fn('append') + prepend = make_set_fn('prepend') + replace = make_set_fn('replace') set_multi = make_set_fn('set_multi') add = make_set_fn('add') incr = make_set_fn('incr') diff --git a/r2/r2/lib/contrib/memcache.py b/r2/r2/lib/contrib/memcache.py old mode 100644 new mode 100755 index 3ae5b960b..2c30141fd --- a/r2/r2/lib/contrib/memcache.py +++ b/r2/r2/lib/contrib/memcache.py @@ -46,8 +46,10 @@ More detailed documentation is available in the L{Client} class. import sys import socket import time -import types +import os from md5 import md5 +import re +import types try: import cPickle as pickle except ImportError: @@ -62,11 +64,16 @@ except ImportError: def decompress(val): raise _Error("received compressed data but I don't support compession (import error)") +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + from binascii import crc32 # zlib version is not cross-platform serverHashFunction = crc32 __author__ = "Evan Martin " -__version__ = "1.36" +__version__ = "1.43" __copyright__ = "Copyright (C) 2003 Danga Interactive" __license__ = "Python" @@ -126,19 +133,35 @@ class Client(local): class MemcachedStringEncodingError(Exception): pass - def __init__(self, servers, debug=0): + def __init__(self, servers, debug=0, pickleProtocol=0, + pickler=pickle.Pickler, unpickler=pickle.Unpickler, + pload=None, pid=None): """ Create a new Client object with the given list of servers. @param servers: C{servers} is passed to L{set_servers}. @param debug: whether to display error messages when a server can't be contacted. + @param pickleProtocol: number to mandate protocol used by (c)Pickle. + @param pickler: optional override of default Pickler to allow subclassing. + @param unpickler: optional override of default Unpickler to allow subclassing. + @param pload: optional persistent_load function to call on pickle loading. + Useful for cPickle since subclassing isn't allowed. + @param pid: optional persistent_id function to call on pickle storing. + Useful for cPickle since subclassing isn't allowed. """ local.__init__(self) self.set_servers(servers) self.debug = debug self.stats = {} + # Allow users to modify pickling/unpickling behavior + self.pickleProtocol = pickleProtocol + self.pickler = pickler + self.unpickler = unpickler + self.persistent_load = pload + self.persistent_id = pid + def set_servers(self, servers): """ Set the pool of servers used by this client. @@ -239,7 +262,7 @@ class Client(local): for s in self.servers: s.close_socket() - def delete_multi(self, keys, seconds=0, key_prefix=''): + def delete_multi(self, keys, time=0, key_prefix=''): ''' Delete multiple keys in the memcache doing just one query. @@ -257,7 +280,7 @@ class Client(local): the next one. @param keys: An iterable of keys to clear - @param seconds: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. + @param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. @param key_prefix: Optional string to prepend to each key when sending to memcache. See docs for L{get_multi} and L{set_multi}. @@ -279,7 +302,7 @@ class Client(local): write = bigcmd.append if time != None: for key in server_keys[server]: # These are mangled keys - write("delete %s %d\r\n" % (key, seconds)) + write("delete %s %d\r\n" % (key, time)) else: for key in server_keys[server]: # These are mangled keys write("delete %s\r\n" % key) @@ -287,7 +310,8 @@ class Client(local): server.send_cmds(''.join(bigcmd)) except socket.error, msg: rc = 0 - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) dead_servers.append(server) # if any servers died on the way, don't expect them to respond. @@ -300,6 +324,7 @@ class Client(local): for key in keys: server.expect("DELETED") except socket.error, msg: + if type(msg) is types.TupleType: msg = msg[1] server.mark_dead(msg) rc = 0 return rc @@ -308,7 +333,7 @@ class Client(local): '''Deletes a key from the memcache. @return: Nonzero on success. - @param seconds: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. + @param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. @rtype: int ''' server, key = self._get_server(key) @@ -325,7 +350,8 @@ class Client(local): server.send_cmd(cmd) server.expect("DELETED") except socket.error, msg: - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) return 0 return 1 @@ -378,10 +404,11 @@ class Client(local): line = server.readline() return int(line) except socket.error, msg: - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) return None - def add(self, key, val, time=0): + def add(self, key, val, time = 0, min_compress_len = 0): ''' Add new key with value. @@ -390,7 +417,30 @@ class Client(local): @return: Nonzero on success. @rtype: int ''' - return self._set("add", key, val, time) + return self._set("add", key, val, time, min_compress_len) + + def append(self, key, val, time=0, min_compress_len=0): + '''Append the value to the end of the existing key's value. + + Only stores in memcache if key already exists. + Also see L{prepend}. + + @return: Nonzero on success. + @rtype: int + ''' + return self._set("append", key, val, time, min_compress_len) + + def prepend(self, key, val, time=0, min_compress_len=0): + '''Prepend the value to the beginning of the existing key's value. + + Only stores in memcache if key already exists. + Also see L{append}. + + @return: Nonzero on success. + @rtype: int + ''' + return self._set("prepend", key, val, time, min_compress_len) + def replace(self, key, val, time=0, min_compress_len=0): '''Replace existing key with value. @@ -401,14 +451,16 @@ class Client(local): @rtype: int ''' return self._set("replace", key, val, time, min_compress_len) + def set(self, key, val, time=0, min_compress_len=0): '''Unconditionally sets a key to a given value in the memcache. - The C{key} can optionally be an tuple, with the first element being the - hash value, if you want to avoid making this module calculate a hash value. - You may prefer, for example, to keep all of a given user's objects on the - same memcache server, so you could use the user's unique id as the hash - value. + The C{key} can optionally be an tuple, with the first element + being the server hash value and the second being the key. + If you want to avoid making this module calculate a hash value. + You may prefer, for example, to keep all of a given user's objects + on the same memcache server, so you could use the user's unique + id as the hash value. @return: Nonzero on success. @rtype: int @@ -530,13 +582,17 @@ class Client(local): write("set %s %d %d %d\r\n%s\r\n" % (key, store_info[0], time, store_info[1], store_info[2])) server.send_cmds(''.join(bigcmd)) except socket.error, msg: - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) dead_servers.append(server) # if any servers died on the way, don't expect them to respond. for server in dead_servers: del server_keys[server] + # short-circuit if there are no servers, just return all keys + if not server_keys: return(mapping.keys()) + notstored = [] # original keys. for server, keys in server_keys.iteritems(): try: @@ -547,11 +603,11 @@ class Client(local): else: notstored.append(prefixed_to_orig_key[key]) #un-mangle. except (_Error, socket.error), msg: + if type(msg) is types.TupleType: msg = msg[1] server.mark_dead(msg) return notstored - @staticmethod - def _val_to_store_info(val, min_compress_len): + def _val_to_store_info(self, val, min_compress_len): """ Transform val to a storable representation, returning a tuple of the flags, the length of the new value, and the new value itself. """ @@ -570,7 +626,12 @@ class Client(local): min_compress_len = 0 else: flags |= Client._FLAG_PICKLE - val = pickle.dumps(val, 1) # Ack! JLR hacks it so that LinkedDict unpicling works w/o figuring out __reduce__. + file = StringIO() + pickler = self.pickler(file, protocol=self.pickleProtocol) + if self.persistent_id: + pickler.persistent_id = self.persistent_id + pickler.dump(val) + val = file.getvalue() # silently do not store if value length exceeds maximum if len(val) >= SERVER_MAX_VALUE_LENGTH: @@ -587,7 +648,7 @@ class Client(local): return (flags, len(val), val) - def _set(self, cmd, key, val, time, min_compress_len=0): + def _set(self, cmd, key, val, time, min_compress_len = 0): server, key = self._get_server(key) key = check_key(key) if not server: @@ -604,7 +665,8 @@ class Client(local): server.send_cmd(fullcmd) return(server.expect("STORED") == "STORED") except socket.error, msg: - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) return 0 def get(self, key): @@ -627,8 +689,7 @@ class Client(local): value = self._recv_value(server, flags, rlen) server.expect("END") except (_Error, socket.error), msg: - if type(msg) is types.TupleType: - msg = msg[1] + if type(msg) is types.TupleType: msg = msg[1] server.mark_dead(msg) return None return value @@ -682,7 +743,8 @@ class Client(local): try: server.send_cmd("get %s" % " ".join(server_keys[server])) except socket.error, msg: - server.mark_dead(msg[1]) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) dead_servers.append(server) # if any servers died on the way, don't expect them to respond. @@ -701,8 +763,9 @@ class Client(local): retvals[prefixed_to_orig_key[rkey]] = val # un-prefix returned key. line = server.readline() except (_Error, socket.error), msg: - server.mark_dead(msg) + if type(msg) is types.TupleType: msg = msg[1] + server.mark_dead(msg) return retvals def _expectvalue(self, server, line=None): @@ -739,9 +802,13 @@ class Client(local): val = long(buf) elif flags & Client._FLAG_PICKLE: try: - val = pickle.loads(buf) - except: - self.debuglog('Pickle error...\n') + file = StringIO(buf) + unpickler = self.unpickler(file) + if self.persistent_load: + unpickler.persistent_load = self.persistent_load + val = unpickler.load() + except Exception, e: + self.debuglog('Pickle error: %s\n' % e) val = None else: self.debuglog("unknown flags on get: %x\n" % flags) @@ -751,6 +818,7 @@ class Client(local): class _Host: _DEAD_RETRY = 1 # number of seconds before retrying a dead server. + _SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. def __init__(self, host, debugfunc=None): if isinstance(host, types.TupleType): @@ -758,11 +826,24 @@ class _Host: else: self.weight = 1 - if host.find(":") > 0: - self.ip, self.port = host.split(":") - self.port = int(self.port) + # parse the connection string + m = re.match(r'^(?Punix):(?P.*)$', host) + if not m: + m = re.match(r'^(?Pinet):' + r'(?P[^:]+)(:(?P[0-9]+))?$', host) + if not m: m = re.match(r'^(?P[^:]+):(?P[0-9]+)$', host) + if not m: + raise ValueError('Unable to parse connection string: "%s"' % host) + + hostData = m.groupdict() + if hostData.get('proto') == 'unix': + self.family = socket.AF_UNIX + self.address = hostData['path'] else: - self.ip, self.port = host, 11211 + self.family = socket.AF_INET + self.ip = hostData['host'] + self.port = int(hostData.get('port', 11211)) + self.address = ( self.ip, self.port ) if not debugfunc: debugfunc = lambda x: x @@ -794,11 +875,15 @@ class _Host: return None if self.socket: return self.socket - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # Python 2.3-ism: s.settimeout(1) + s = socket.socket(self.family, socket.SOCK_STREAM) + if hasattr(s, 'settimeout'): s.settimeout(self._SOCKET_TIMEOUT) try: - s.connect((self.ip, self.port)) + s.connect(self.address) + except socket.timeout, msg: + self.mark_dead("connect: %s" % msg) + return None except socket.error, msg: + if type(msg) is types.TupleType: msg = msg[1] self.mark_dead("connect: %s" % msg[1]) return None self.socket = s @@ -859,7 +944,11 @@ class _Host: d = '' if self.deaduntil: d = " (dead until %d)" % self.deaduntil - return "%s:%d%s" % (self.ip, self.port, d) + + if self.family == socket.AF_INET: + return "inet:%s:%d%s" % (self.address[0], self.address[1], d) + else: + return "unix:%s%s" % (self.address, d) def check_key(key, key_extra_len=0): """Checks sanity of key. Fails if: @@ -867,19 +956,12 @@ def check_key(key, key_extra_len=0): Contains control characters (Raises MemcachedKeyCharacterError). Is not a string (Raises MemcachedStringEncodingError) """ + if type(key) == types.TupleType: key = key[1] if not isinstance(key, str): raise Client.MemcachedStringEncodingError, ("Keys must be str()'s, not" "unicode. Convert your unicode strings using " "mystring.encode(charset)!") - #if isinstance(key, basestring): - #if len(key) + key_extra_len > SERVER_MAX_KEY_LENGTH: - #raise Client.MemcachedKeyLengthError, ("Key length is > %s" - #% SERVER_MAX_KEY_LENGTH) - #for char in key: - #if ord(char) < 32: - #raise Client.MemcachedKeyCharacterError, "Control characters not allowed" - return md5(key).hexdigest() def _doctest(): @@ -894,125 +976,143 @@ if __name__ == "__main__": _doctest() print "Running tests:" print - #servers = ["127.0.0.1:11211", "127.0.0.1:11212"] - servers = ["127.0.0.1:11211"] - mc = Client(servers, debug=1) + serverList = [["127.0.0.1:11211"]] + if '--do-unix' in sys.argv: + serverList.append([os.path.join(os.getcwd(), 'memcached.socket')]) - def to_s(val): - if not isinstance(val, types.StringTypes): - return "%s (%s)" % (val, type(val)) - return "%s" % val - def test_setget(key, val): - print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)), - mc.set(key, val) - newval = mc.get(key) - if newval == val: - print "OK" - return 1 - else: - print "FAIL" - return 0 + for servers in serverList: + mc = Client(servers, debug=1) + + def to_s(val): + if not isinstance(val, types.StringTypes): + return "%s (%s)" % (val, type(val)) + return "%s" % val + def test_setget(key, val): + print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)), + mc.set(key, val) + newval = mc.get(key) + if newval == val: + print "OK" + return 1 + else: + print "FAIL" + return 0 - class FooStruct: - def __init__(self): - self.bar = "baz" - def __str__(self): - return "A FooStruct" - def __eq__(self, other): - if isinstance(other, FooStruct): - return self.bar == other.bar - return 0 + class FooStruct: + def __init__(self): + self.bar = "baz" + def __str__(self): + return "A FooStruct" + def __eq__(self, other): + if isinstance(other, FooStruct): + return self.bar == other.bar + return 0 - test_setget("a_string", "some random string") - test_setget("an_integer", 42) - if test_setget("long", long(1<<30)): - print "Testing delete ...", - if mc.delete("long"): + test_setget("a_string", "some random string") + test_setget("an_integer", 42) + if test_setget("long", long(1<<30)): + print "Testing delete ...", + if mc.delete("long"): + print "OK" + else: + print "FAIL" + print "Testing get_multi ...", + print mc.get_multi(["a_string", "an_integer"]) + + print "Testing get(unknown value) ...", + print to_s(mc.get("unknown_value")) + + f = FooStruct() + test_setget("foostruct", f) + + print "Testing incr ...", + x = mc.incr("an_integer", 1) + if x == 43: print "OK" else: print "FAIL" - print "Testing get_multi ...", - print mc.get_multi(["a_string", "an_integer"]) - print "Testing get(unknown value) ...", - print to_s(mc.get("unknown_value")) + print "Testing decr ...", + x = mc.decr("an_integer", 1) + if x == 42: + print "OK" + else: + print "FAIL" - f = FooStruct() - test_setget("foostruct", f) + # sanity tests + print "Testing sending spaces...", + try: + x = mc.set("this has spaces", 1) + except Client.MemcachedKeyCharacterError, msg: + print "OK" + else: + print "FAIL" - print "Testing incr ...", - x = mc.incr("an_integer", 1) - if x == 43: - print "OK" - else: - print "FAIL" + print "Testing sending control characters...", + try: + x = mc.set("this\x10has\x11control characters\x02", 1) + except Client.MemcachedKeyCharacterError, msg: + print "OK" + else: + print "FAIL" - print "Testing decr ...", - x = mc.decr("an_integer", 1) - if x == 42: - print "OK" - else: - print "FAIL" + print "Testing using insanely long key...", + try: + x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'aaaa', 1) + except Client.MemcachedKeyLengthError, msg: + print "OK" + else: + print "FAIL" - # sanity tests - print "Testing sending spaces...", - try: - x = mc.set("this has spaces", 1) - except Client.MemcachedKeyCharacterError, msg: - print "OK" - else: - print "FAIL" + print "Testing sending a unicode-string key...", + try: + x = mc.set(u'keyhere', 1) + except Client.MemcachedStringEncodingError, msg: + print "OK", + else: + print "FAIL", + try: + x = mc.set((u'a'*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) + except: + print "FAIL", + else: + print "OK", + import pickle + s = pickle.loads('V\\u4f1a\np0\n.') + try: + x = mc.set((s*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) + except Client.MemcachedKeyLengthError: + print "OK" + else: + print "FAIL" - print "Testing sending control characters...", - try: - x = mc.set("this\x10has\x11control characters\x02", 1) - except Client.MemcachedKeyCharacterError, msg: - print "OK" - else: - print "FAIL" + print "Testing using a value larger than the memcached value limit...", + x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH) + if mc.get('keyhere') == None: + print "OK", + else: + print "FAIL", + x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa') + if mc.get('keyhere') == None: + print "OK" + else: + print "FAIL" - print "Testing using insanely long key...", - try: - x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'aaaa', 1) - except Client.MemcachedKeyLengthError, msg: - print "OK" - else: - print "FAIL" - - print "Testing sending a unicode-string key...", - try: - x = mc.set(u'keyhere', 1) - except Client.MemcachedStringEncodingError, msg: - print "OK", - else: - print "FAIL", - try: - x = mc.set((u'a'*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) - except: - print "FAIL", - else: - print "OK", - import pickle - s = pickle.loads('V\\u4f1a\np0\n.') - try: - x = mc.set((s*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) - except Client.MemcachedKeyLengthError: - print "OK" - else: - print "FAIL" - - print "Testing using a value larger than the memcached value limit...", - x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH) - if mc.get('keyhere') == None: - print "OK", - else: - print "FAIL", - x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa') - if mc.get('keyhere') == None: - print "OK" - else: - print "FAIL" + print "Testing set_multi() with no memcacheds running", + mc.disconnect_all() + errors = mc.set_multi({'keyhere' : 'a', 'keythere' : 'b'}) + if errors != []: + print "FAIL" + else: + print "OK" + print "Testing delete_multi() with no memcacheds running", + mc.disconnect_all() + ret = mc.delete_multi({'keyhere' : 'a', 'keythere' : 'b'}) + if ret != 1: + print "FAIL" + else: + print "OK" # vim: ts=4 sw=4 et :