diff --git a/r2/r2/lib/db/tdb_cassandra.py b/r2/r2/lib/db/tdb_cassandra.py index d4bf79e74..a34b1bf36 100644 --- a/r2/r2/lib/db/tdb_cassandra.py +++ b/r2/r2/lib/db/tdb_cassandra.py @@ -26,12 +26,11 @@ from pylons import g from pycassa import ColumnFamily from pycassa.cassandra.ttypes import ConsistencyLevel, NotFoundException -from pycassa.system_manager import SystemManager, UTF8_TYPE, COUNTER_COLUMN_TYPE - +from pycassa.system_manager import SystemManager, UTF8_TYPE, COUNTER_COLUMN_TYPE, TIME_UUID_TYPE from r2.lib.utils import tup, Storage from r2.lib.db.sorts import epoch_seconds from r2.lib import cache -from uuid import uuid1 +from uuid import uuid1, UUID from itertools import chain import cPickle as pickle @@ -282,7 +281,7 @@ class ThingBase(object): raise TdbException("Cannot make instances of %r" % (self.__class__,)) @classmethod - def _byID(cls, ids, properties=None): + def _byID(cls, ids, return_dict=True, properties=None): ids, is_single = tup(ids, True) if properties is not None: @@ -295,7 +294,7 @@ class ThingBase(object): return {} # all keys must be strings or directly convertable to strings - assert all(isinstance(_id, basestring) and str(_id) for _id in ids) + assert all(isinstance(_id, basestring) or str(_id) for _id in ids) def reject_bad_partials(cached, still_need): # tell sgm that the match it found in the cache isn't good @@ -356,8 +355,10 @@ class ThingBase(object): elif is_single: assert len(ret) == 1 return ret.values()[0] - - return ret + elif return_dict: + return ret + else: + return filter(None, (ret.get(i) for i in ids)) @property def _fullname(self): @@ -368,7 +369,7 @@ class ThingBase(object): return '%s_%s' % (self._type_prefix, self._id) @classmethod - def _by_fullname(cls, fnames): + def _by_fullname(cls, fnames, return_dict=True): ids, is_single = tup(fnames, True) by_cls = {} @@ -382,8 +383,13 @@ class ThingBase(object): for typ, ids in by_cls.iteritems(): items.extend(typ._byID(ids).values()) - return items[0] if is_single else dict((x._fullname, x) - for x in items) + if is_single: + return items[0] + elif return_dict: + return dict((x._fullname, x) for x in items) + else: + d = dict((x._fullname, x) for x in items) + return [d[fullname] for fullname in fnames] @classmethod def _cache_prefix(cls): @@ -717,6 +723,34 @@ class ThingBase(object): class Thing(ThingBase): _timestamp_prop = 'date' +class UuidThing(ThingBase): + _timestamp_prop = 'date' + _extra_schema_creation_args = { + 'key_validation_class': TIME_UUID_TYPE + } + + def __init__(self, **kw): + ThingBase.__init__(self, _id=uuid1(), **kw) + + @classmethod + def _byID(cls, ids, **kw): + ids, is_single = tup(ids, ret_is_single=True) + + #Convert string ids to UUIDs before retrieving + uuids = [UUID(id) if not isinstance(id, UUID) else id for id in ids] + + if len(uuids) == 0: + return {} + elif is_single: + assert len(uuids) == 1 + uuids = uuids[0] + + return super(UuidThing, cls)._byID(uuids, **kw) + + @classmethod + def _cache_key_id(cls, t_id): + return cls._cache_prefix() + str(t_id) + class Relation(ThingBase): _timestamp_prop = 'date' @@ -822,6 +856,96 @@ class Relation(ThingBase): # ick return str(long(cls._serialize_date(date))) +class ColumnQuery(object): + """ + A query across a row of a CF. + """ + _chunk_size = 100 + + def __init__(self, cls, rowkey, column_start="", column_finish="", + column_count=100, column_reversed=True, + column_to_obj=None, + obj_to_column=None): + self.cls = cls + self.rowkey = rowkey + self.column_start = column_start + self.column_finish = column_finish + self._limit = column_count + self.column_reversed = column_reversed + self.column_to_obj = column_to_obj or self.default_column_to_obj + self.obj_to_column = obj_to_column or self.default_obj_to_column + self._rules = [] # dummy parameter to mimic tdb_sql queries + + @staticmethod + def default_column_to_obj(columns): + """ + Mapping from column --> object. + + This default doesn't actually return the underlying object but we don't + know how to do that without more information. + """ + return columns + + @staticmethod + def default_obj_to_column(objs): + """ + Mapping from object --> column + """ + objs, is_single = tup(objs, ret_is_single=True) + columns = [{obj._id: obj._id} for obj in objs] + + if is_single: + return columns[0] + else: + return columns + + def _after(self, thing): + if thing: + column_name = self.obj_to_column(thing).keys()[0] + self.column_start = column_name + else: + self.column_start = "" + + def _after_id(self, column_name): + self.column_start = column_name + + def _reverse(self): + # Logic of standard reddit query is opposite of cassandra + self.column_reversed = False + + def __iter__(self): + # Get the max number of columns we could grab in this query + total_columns = self.cls._cf.get_count(self.rowkey, + column_start=self.column_start, + column_finish=self.column_finish) + retrievable_columns = min(total_columns, self._limit) + + retrieved = 0 + column_start = self.column_start + while retrieved <= retrievable_columns: + try: + r = self.cls._cf.get(self.rowkey, column_start=column_start, + column_finish=self.column_finish, + column_count=self._chunk_size, + column_reversed=self.column_reversed) + except NotFoundException: + return + + retrieved += self._chunk_size + columns = [{col_name: r[col_name]} for col_name in r if col_name != column_start] + if not columns: + return + column_start = columns[-1].keys()[0] + objs = self.column_to_obj(columns) + + objs, is_single = tup(objs, ret_is_single=True) + for obj in objs: + yield obj + + def __repr__(self): + return "<%s(%s-%r)>" % (self.__class__.__name__, self.cls.__name__, + self.rowkey) + class Query(object): """A query across a CF. Note that while you can query rows from a CF that has a RandomPartitioner, you won't get them in any sort @@ -905,32 +1029,79 @@ class Query(object): class View(ThingBase): # Views are Things like any other, but may have special key - # characteristics + # characteristics. Uses ColumnQuery for queries across a row. - # these default to not having a timestamp column _timestamp_prop = None - _value_type = 'str' + _compare_with = UTF8_TYPE # Type of the columns - should match _key_validation_class of _view_of class + _view_of = None + _write_consistency_level = CL.ONE # Is this necessary? + _query_cls = ColumnQuery + + @classmethod + def _rowkey(cls, obj): + """Mapping from _view_of object --> view rowkey. No default + implementation is provided because this is the fundamental aspect of the + view.""" + raise NotImplementedError + + @classmethod + def _obj_to_column(cls, objs): + """Mapping from _view_of object --> view column. Returns a + single item dict {column name:column value} or list of dicts.""" + objs, is_single = tup(objs, ret_is_single=True) + + columns = [{obj._id: obj._id} for obj in objs] + + if len(columns) == 1: + return columns[0] + else: + return columns + + @classmethod + def _column_to_obj(cls, columns): + """Mapping from view column --> _view_of object. Must be complement to + _obj_to_column().""" + columns, is_single = tup(columns, ret_is_single=True) + + ids = [column.keys()[0] for column in columns] + + if len(ids) == 1: + ids = ids[0] + return cls._view_of._byID(ids, return_dict=False) + + @classmethod + def add_object(cls, obj, **kw): + """Add a lookup to the view""" + rowkey = cls._rowkey(obj) + column = cls._obj_to_column(obj) + cls._set_values(rowkey, column, **kw) + + @classmethod + def query(cls, rowkey, after=None, reverse=False, count=1000): + """Return a query to get objects from the underlying _view_of class.""" + + column_reversed = not reverse # Reverse convention for cassandra is opposite + + q = cls._query_cls(cls, rowkey, column_count=count, + column_reversed=column_reversed, + column_to_obj=cls._column_to_obj, + obj_to_column=cls._obj_to_column) + q._after(after) + return q + def _values(self): """Retrieve the entire contents of the view""" # TODO: at present this only grabs max_column_count columns return self._t - @staticmethod - def _gen_uuid(): - """Convenience method for generating UUIDs for view - keys. Generates time-based UUIDs, safe for use as TimeUUID - indices in Cassandra""" - - return uuid1() - @classmethod @will_write def _set_values(cls, row_key, col_values, write_consistency_level = None, ttl=None): - """Set a set of column values in a row of a View without + """Set a set of column values in a row of a view without looking up the whole row first""" # col_values =:= dict(col_name -> col_value) @@ -942,16 +1113,14 @@ class View(ThingBase): # based on the _default_ttls class dict. Note! There is no way # to use this API to express that you don't want a TTL if # there is a default set on either the row or the column - default_ttl = None if ttl is None else self._ttl + default_ttl = ttl or cls._ttl with cls._cf.batch(write_consistency_level = cls._wcl(write_consistency_level)) as b: # with some quick tweaks we could have a version that # operates across multiple row keys, but this is not it for k, v in updates.iteritems(): - ttl = cls._default_ttls.get(k, default_ttl) - b.insert(row_key, - {k: v}, - ttl = cls._default_ttls.get(k, default_ttl)) + b.insert(row_key, {k: v}, + ttl=cls._default_ttls.get(k, default_ttl)) # can we be smarter here? thing_cache.delete(cls._cache_key_id(row_key))