query_cache: Add support for precomputed queries.

This commit is contained in:
Neil Williams
2012-10-19 13:37:26 -07:00
parent 33400828c0
commit 16d0b1c480

View File

@@ -43,7 +43,7 @@ from pycassa.batch import Mutator
from r2.models import Thing
from r2.lib.db import tdb_cassandra
from r2.lib.db.operators import asc
from r2.lib.db.operators import asc, desc, BooleanOp
from r2.lib.db.sorts import epoch_seconds
from r2.lib.utils import flatten, to36
@@ -129,11 +129,12 @@ class CachedQuery(_CachedQueryBase):
"""
def __init__(self, model, key, sort, filter_fn):
def __init__(self, model, key, sort, filter_fn, is_precomputed):
self.model = model
self.key = key
self.filter = filter_fn
self.timestamps = None # column timestamps, for safe pruning
self.is_precomputed = is_precomputed
super(CachedQuery, self).__init__(sort)
def _make_item_tuple(self, item):
@@ -159,13 +160,28 @@ class CachedQuery(_CachedQueryBase):
@classmethod
def _fetch_multi(self, queries):
"""Fetch the unsorted query results for multiple queries at once.
In the case of precomputed queries, do an extra lookup first to
determine which row key to find the latest precomputed values for the
query in.
"""
by_model = collections.defaultdict(list)
for q in queries:
by_model[q.model].append(q)
cached_queries = {}
for model, queries in by_model.iteritems():
fetched = model.get([q.key for q in queries])
pure, need_mangling = [], []
for q in queries:
if not q.is_precomputed:
pure.append(q.key)
else:
need_mangling.append(q.key)
mangled = model.index_mangle_keys(need_mangling)
fetched = model.get(pure + mangled)
cached_queries.update(fetched)
for q in queries:
@@ -291,6 +307,7 @@ class CachedQueryMutator(object):
LOG.debug("Inserting %r into query %r", things, query)
assert not query.is_precomputed
query._insert(self.mutator, things)
if (random.random() / len(things)) < PRUNE_CHANCE:
@@ -339,6 +356,33 @@ def filter_thing2(x):
return x._thing2
def _is_query_precomputed(query):
"""Return if this query must be updated offline in a batch job.
Simple queries can be modified in place in the query cache, but ones
with more complicated eligibility criteria, such as a time limit ("top
this month") cannot be modified this way and must instead be
recalculated periodically. Rather than replacing a single row
repeatedly, the precomputer stores in a new row every time it runs and
updates an index of the latest run.
"""
# visit all the nodes in the rule tree to see if there are time limitations
# if we find one, this query is one that must be precomputed
rules = list(query._rules)
while rules:
rule = rules.pop()
if isinstance(rule, BooleanOp):
rules.extend(rule.ops)
continue
if rule.lval.name == "_date":
return True
return False
def cached_query(model, filter_fn=filter_identity, sort=None):
"""Decorate a function describing a cached query.
@@ -375,10 +419,19 @@ def cached_query(model, filter_fn=filter_identity, sort=None):
row_key = '.'.join(row_key_components)
query = fn(*args)
query_sort = query._sort if query else sort
assert query_sort
return CachedQuery(model, row_key, query_sort, filter_fn)
if query:
# sql-backed query
query_sort = query._sort
is_precomputed = _is_query_precomputed(query)
else:
# pure-cassandra query
assert sort
query_sort = sort
is_precomputed = False
return CachedQuery(model, row_key, query_sort, filter_fn,
is_precomputed)
return cached_query_wrapper
return cached_query_decorator
@@ -443,6 +496,22 @@ class _BaseQueryCache(object):
return res
@classmethod
def index_mangle_keys(cls, keys):
if not keys:
return []
index_keys = ["/".join((key, "index")) for key in keys]
rows = cls._cf.multiget(index_keys,
column_reversed=True,
column_count=1)
res = []
for key, columns in rows.iteritems():
index_component = columns.keys()[0]
res.append("/".join((key, index_component)))
return res
@classmethod
@tdb_cassandra.will_write
def insert(cls, mutator, key, columns):