Allow UUIDs for cassandra rowkeys and column names.

Also add a query that iterates over columns.
This commit is contained in:
bsimpson63
2011-10-21 16:11:48 -07:00
parent 8de8c133f1
commit eb74ad4714

View File

@@ -26,12 +26,11 @@ from pylons import g
from pycassa import ColumnFamily
from pycassa.cassandra.ttypes import ConsistencyLevel, NotFoundException
from pycassa.system_manager import SystemManager, UTF8_TYPE, COUNTER_COLUMN_TYPE
from pycassa.system_manager import SystemManager, UTF8_TYPE, COUNTER_COLUMN_TYPE, TIME_UUID_TYPE
from r2.lib.utils import tup, Storage
from r2.lib.db.sorts import epoch_seconds
from r2.lib import cache
from uuid import uuid1
from uuid import uuid1, UUID
from itertools import chain
import cPickle as pickle
@@ -282,7 +281,7 @@ class ThingBase(object):
raise TdbException("Cannot make instances of %r" % (self.__class__,))
@classmethod
def _byID(cls, ids, properties=None):
def _byID(cls, ids, return_dict=True, properties=None):
ids, is_single = tup(ids, True)
if properties is not None:
@@ -295,7 +294,7 @@ class ThingBase(object):
return {}
# all keys must be strings or directly convertable to strings
assert all(isinstance(_id, basestring) and str(_id) for _id in ids)
assert all(isinstance(_id, basestring) or str(_id) for _id in ids)
def reject_bad_partials(cached, still_need):
# tell sgm that the match it found in the cache isn't good
@@ -356,8 +355,10 @@ class ThingBase(object):
elif is_single:
assert len(ret) == 1
return ret.values()[0]
return ret
elif return_dict:
return ret
else:
return filter(None, (ret.get(i) for i in ids))
@property
def _fullname(self):
@@ -368,7 +369,7 @@ class ThingBase(object):
return '%s_%s' % (self._type_prefix, self._id)
@classmethod
def _by_fullname(cls, fnames):
def _by_fullname(cls, fnames, return_dict=True):
ids, is_single = tup(fnames, True)
by_cls = {}
@@ -382,8 +383,13 @@ class ThingBase(object):
for typ, ids in by_cls.iteritems():
items.extend(typ._byID(ids).values())
return items[0] if is_single else dict((x._fullname, x)
for x in items)
if is_single:
return items[0]
elif return_dict:
return dict((x._fullname, x) for x in items)
else:
d = dict((x._fullname, x) for x in items)
return [d[fullname] for fullname in fnames]
@classmethod
def _cache_prefix(cls):
@@ -717,6 +723,34 @@ class ThingBase(object):
class Thing(ThingBase):
_timestamp_prop = 'date'
class UuidThing(ThingBase):
_timestamp_prop = 'date'
_extra_schema_creation_args = {
'key_validation_class': TIME_UUID_TYPE
}
def __init__(self, **kw):
ThingBase.__init__(self, _id=uuid1(), **kw)
@classmethod
def _byID(cls, ids, **kw):
ids, is_single = tup(ids, ret_is_single=True)
#Convert string ids to UUIDs before retrieving
uuids = [UUID(id) if not isinstance(id, UUID) else id for id in ids]
if len(uuids) == 0:
return {}
elif is_single:
assert len(uuids) == 1
uuids = uuids[0]
return super(UuidThing, cls)._byID(uuids, **kw)
@classmethod
def _cache_key_id(cls, t_id):
return cls._cache_prefix() + str(t_id)
class Relation(ThingBase):
_timestamp_prop = 'date'
@@ -822,6 +856,96 @@ class Relation(ThingBase):
# ick
return str(long(cls._serialize_date(date)))
class ColumnQuery(object):
"""
A query across a row of a CF.
"""
_chunk_size = 100
def __init__(self, cls, rowkey, column_start="", column_finish="",
column_count=100, column_reversed=True,
column_to_obj=None,
obj_to_column=None):
self.cls = cls
self.rowkey = rowkey
self.column_start = column_start
self.column_finish = column_finish
self._limit = column_count
self.column_reversed = column_reversed
self.column_to_obj = column_to_obj or self.default_column_to_obj
self.obj_to_column = obj_to_column or self.default_obj_to_column
self._rules = [] # dummy parameter to mimic tdb_sql queries
@staticmethod
def default_column_to_obj(columns):
"""
Mapping from column --> object.
This default doesn't actually return the underlying object but we don't
know how to do that without more information.
"""
return columns
@staticmethod
def default_obj_to_column(objs):
"""
Mapping from object --> column
"""
objs, is_single = tup(objs, ret_is_single=True)
columns = [{obj._id: obj._id} for obj in objs]
if is_single:
return columns[0]
else:
return columns
def _after(self, thing):
if thing:
column_name = self.obj_to_column(thing).keys()[0]
self.column_start = column_name
else:
self.column_start = ""
def _after_id(self, column_name):
self.column_start = column_name
def _reverse(self):
# Logic of standard reddit query is opposite of cassandra
self.column_reversed = False
def __iter__(self):
# Get the max number of columns we could grab in this query
total_columns = self.cls._cf.get_count(self.rowkey,
column_start=self.column_start,
column_finish=self.column_finish)
retrievable_columns = min(total_columns, self._limit)
retrieved = 0
column_start = self.column_start
while retrieved <= retrievable_columns:
try:
r = self.cls._cf.get(self.rowkey, column_start=column_start,
column_finish=self.column_finish,
column_count=self._chunk_size,
column_reversed=self.column_reversed)
except NotFoundException:
return
retrieved += self._chunk_size
columns = [{col_name: r[col_name]} for col_name in r if col_name != column_start]
if not columns:
return
column_start = columns[-1].keys()[0]
objs = self.column_to_obj(columns)
objs, is_single = tup(objs, ret_is_single=True)
for obj in objs:
yield obj
def __repr__(self):
return "<%s(%s-%r)>" % (self.__class__.__name__, self.cls.__name__,
self.rowkey)
class Query(object):
"""A query across a CF. Note that while you can query rows from a
CF that has a RandomPartitioner, you won't get them in any sort
@@ -905,32 +1029,79 @@ class Query(object):
class View(ThingBase):
# Views are Things like any other, but may have special key
# characteristics
# characteristics. Uses ColumnQuery for queries across a row.
# these default to not having a timestamp column
_timestamp_prop = None
_value_type = 'str'
_compare_with = UTF8_TYPE # Type of the columns - should match _key_validation_class of _view_of class
_view_of = None
_write_consistency_level = CL.ONE # Is this necessary?
_query_cls = ColumnQuery
@classmethod
def _rowkey(cls, obj):
"""Mapping from _view_of object --> view rowkey. No default
implementation is provided because this is the fundamental aspect of the
view."""
raise NotImplementedError
@classmethod
def _obj_to_column(cls, objs):
"""Mapping from _view_of object --> view column. Returns a
single item dict {column name:column value} or list of dicts."""
objs, is_single = tup(objs, ret_is_single=True)
columns = [{obj._id: obj._id} for obj in objs]
if len(columns) == 1:
return columns[0]
else:
return columns
@classmethod
def _column_to_obj(cls, columns):
"""Mapping from view column --> _view_of object. Must be complement to
_obj_to_column()."""
columns, is_single = tup(columns, ret_is_single=True)
ids = [column.keys()[0] for column in columns]
if len(ids) == 1:
ids = ids[0]
return cls._view_of._byID(ids, return_dict=False)
@classmethod
def add_object(cls, obj, **kw):
"""Add a lookup to the view"""
rowkey = cls._rowkey(obj)
column = cls._obj_to_column(obj)
cls._set_values(rowkey, column, **kw)
@classmethod
def query(cls, rowkey, after=None, reverse=False, count=1000):
"""Return a query to get objects from the underlying _view_of class."""
column_reversed = not reverse # Reverse convention for cassandra is opposite
q = cls._query_cls(cls, rowkey, column_count=count,
column_reversed=column_reversed,
column_to_obj=cls._column_to_obj,
obj_to_column=cls._obj_to_column)
q._after(after)
return q
def _values(self):
"""Retrieve the entire contents of the view"""
# TODO: at present this only grabs max_column_count columns
return self._t
@staticmethod
def _gen_uuid():
"""Convenience method for generating UUIDs for view
keys. Generates time-based UUIDs, safe for use as TimeUUID
indices in Cassandra"""
return uuid1()
@classmethod
@will_write
def _set_values(cls, row_key, col_values,
write_consistency_level = None,
ttl=None):
"""Set a set of column values in a row of a View without
"""Set a set of column values in a row of a view without
looking up the whole row first"""
# col_values =:= dict(col_name -> col_value)
@@ -942,16 +1113,14 @@ class View(ThingBase):
# based on the _default_ttls class dict. Note! There is no way
# to use this API to express that you don't want a TTL if
# there is a default set on either the row or the column
default_ttl = None if ttl is None else self._ttl
default_ttl = ttl or cls._ttl
with cls._cf.batch(write_consistency_level = cls._wcl(write_consistency_level)) as b:
# with some quick tweaks we could have a version that
# operates across multiple row keys, but this is not it
for k, v in updates.iteritems():
ttl = cls._default_ttls.get(k, default_ttl)
b.insert(row_key,
{k: v},
ttl = cls._default_ttls.get(k, default_ttl))
b.insert(row_key, {k: v},
ttl=cls._default_ttls.get(k, default_ttl))
# can we be smarter here?
thing_cache.delete(cls._cache_key_id(row_key))