mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-02-01 02:05:03 -05:00
Migrate Subreddit search to Cloudsearch
This expands upon the upload code in cloudsearch.py cloudsearch.py's classes and functions have been refactored to support multiple indices based on different objects.
This commit is contained in:
@@ -38,8 +38,9 @@ from r2.lib.db.operators import desc
|
||||
from r2.lib.db import queries
|
||||
from r2.lib.db.tdb_cassandra import MultiColumnQuery
|
||||
from r2.lib.strings import strings
|
||||
from r2.lib.search import SearchQuery, SearchException, InvalidQuery
|
||||
from r2.lib.solrsearch import RelatedSearchQuery, SubredditSearchQuery
|
||||
from r2.lib.search import (SearchQuery, SubredditSearchQuery, SearchException,
|
||||
InvalidQuery)
|
||||
from r2.lib.solrsearch import RelatedSearchQuery
|
||||
from r2.lib.contrib.pysolr import SolrError
|
||||
from r2.lib import jsontemplates
|
||||
from r2.lib import sup
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import cPickle as pickle
|
||||
from datetime import datetime
|
||||
import functools
|
||||
import httplib
|
||||
import json
|
||||
from lxml import etree
|
||||
@@ -21,7 +22,6 @@ from r2.models import (Account, Link, Subreddit, Thing, All, DefaultSR,
|
||||
_CHUNK_SIZE = 4000000 # Approx. 4 MB, to stay under the 5MB limit
|
||||
_VERSION_OFFSET = 13257906857
|
||||
ILLEGAL_XML = re.compile(u'[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]')
|
||||
USE_SAFE_GET = False
|
||||
|
||||
|
||||
def _safe_xml_str(s, use_encoding="utf-8"):
|
||||
@@ -40,81 +40,6 @@ def _safe_xml_str(s, use_encoding="utf-8"):
|
||||
return s
|
||||
|
||||
|
||||
class CloudSearchHTTPError(httplib.HTTPException): pass
|
||||
class InvalidQuery(Exception): pass
|
||||
|
||||
|
||||
def _version():
|
||||
'''Cloudsearch documents don't update unless the sent "version" field
|
||||
is higher than the one currently indexed. As our documents don't have
|
||||
"versions" and could in theory be updated multiple times in one second,
|
||||
for now, use "tenths of a second since 12:00:00.00 1/1/2012" as the
|
||||
"version" - this will last approximately 13 years until bumping up against
|
||||
the version max of 2^32 for cloudsearch docs'''
|
||||
return int(time.time() * 10) - _VERSION_OFFSET
|
||||
|
||||
|
||||
### Document Upload ###
|
||||
|
||||
def add_xml(thing, version, srs, accounts):
|
||||
'''Return an etree XML representation of the thing, suitable for
|
||||
sending to cloudsearch
|
||||
|
||||
'''
|
||||
add = etree.Element("add", id=thing._fullname, version=str(version),
|
||||
lang="en")
|
||||
|
||||
account = accounts[thing.author_id]
|
||||
sr = srs[thing.sr_id]
|
||||
nsfw = sr.over_18 or thing.over_18 or Link._nsfw.findall(thing.title)
|
||||
|
||||
fields = {"ups": max(0, thing._ups),
|
||||
"downs": max(0, thing._downs),
|
||||
"num_comments": max(0, getattr(thing, 'num_comments', 0)),
|
||||
"fullname": thing._fullname,
|
||||
"subreddit": sr.name,
|
||||
"reddit": sr.name,
|
||||
"title": thing.title,
|
||||
"timestamp": thing._date.strftime("%s"),
|
||||
"sr_id": thing.sr_id,
|
||||
"over18": 1 if nsfw else 0,
|
||||
"is_self": 1 if thing.is_self else 0,
|
||||
"author_fullname": account._fullname,
|
||||
}
|
||||
|
||||
if account._deleted:
|
||||
fields['author'] = '[deleted]'
|
||||
else:
|
||||
fields['author'] = account.name
|
||||
|
||||
if thing.is_self:
|
||||
fields['site'] = g.domain
|
||||
if thing.selftext:
|
||||
fields['selftext'] = thing.selftext
|
||||
else:
|
||||
fields['url'] = thing.url
|
||||
try:
|
||||
fields['site'] = ' '.join(r2utils.UrlParser(thing.url).domain_permutations())
|
||||
except ValueError:
|
||||
# UrlParser couldn't handle thing.url, oh well
|
||||
pass
|
||||
|
||||
for field_name, value in fields.iteritems():
|
||||
field = etree.SubElement(add, "field", name=field_name)
|
||||
field.text = _safe_xml_str(value)
|
||||
|
||||
return add
|
||||
|
||||
|
||||
def delete_xml(thing, version):
|
||||
'''Return the cloudsearch XML representation of
|
||||
"delete this from the index"
|
||||
|
||||
'''
|
||||
delete = etree.Element("delete", id=thing._fullname, version=str(version))
|
||||
return delete
|
||||
|
||||
|
||||
def safe_get(get_fn, ids, return_dict=True, **kw):
|
||||
items = {}
|
||||
for i in ids:
|
||||
@@ -130,77 +55,258 @@ def safe_get(get_fn, ids, return_dict=True, **kw):
|
||||
return items.values()
|
||||
|
||||
|
||||
def xml_from_things(things):
|
||||
'''Generate a <batch> XML tree to send to cloudsearch for
|
||||
adding/updating/deleting the given things
|
||||
|
||||
'''
|
||||
batch = etree.Element("batch")
|
||||
|
||||
author_ids = [thing.author_id for thing in things
|
||||
if hasattr(thing, 'author_id')]
|
||||
try:
|
||||
accounts = Account._byID(author_ids, data=True, return_dict=True)
|
||||
except NotFound:
|
||||
if USE_SAFE_GET:
|
||||
accounts = safe_get(Account._byID, author_ids, data=True,
|
||||
return_dict=True)
|
||||
else:
|
||||
raise
|
||||
class CloudSearchHTTPError(httplib.HTTPException): pass
|
||||
class InvalidQuery(Exception): pass
|
||||
|
||||
sr_ids = [thing.sr_id for thing in things if hasattr(thing, 'sr_id')]
|
||||
try:
|
||||
srs = Subreddit._byID(sr_ids, data=True, return_dict=True)
|
||||
except NotFound:
|
||||
if USE_SAFE_GET:
|
||||
srs = safe_get(Subreddit._byID, sr_ids, data=True, return_dict=True)
|
||||
else:
|
||||
raise
|
||||
|
||||
class CloudSearchUploader(object):
|
||||
use_safe_get = False
|
||||
types = ()
|
||||
|
||||
version = _version()
|
||||
for thing in things:
|
||||
def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET):
|
||||
self.doc_api = doc_api
|
||||
self._version_offset = version_offset
|
||||
self.things = self.desired_things(things) if things else []
|
||||
|
||||
@classmethod
|
||||
def desired_fullnames(cls, items):
|
||||
'''Pull fullnames that represent instances of 'types' out of items'''
|
||||
fullnames = set()
|
||||
type_ids = [type_._type_id for type_ in cls.types]
|
||||
for item in items:
|
||||
item_type = r2utils.decompose_fullname(item['fullname'])[1]
|
||||
if item_type in type_ids:
|
||||
fullnames.add(item['fullname'])
|
||||
return fullnames
|
||||
|
||||
@classmethod
|
||||
def desired_things(cls, things):
|
||||
return [t for t in things if isinstance(t, cls.types)]
|
||||
|
||||
def _version_tenths(self):
|
||||
'''Cloudsearch documents don't update unless the sent "version" field
|
||||
is higher than the one currently indexed. As our documents don't have
|
||||
"versions" and could in theory be updated multiple times in one second,
|
||||
for now, use "tenths of a second since 12:00:00.00 1/1/2012" as the
|
||||
"version" - this will last approximately 13 years until bumping up against
|
||||
the version max of 2^32 for cloudsearch docs'''
|
||||
return int(time.time() * 10) - self._version_offset
|
||||
|
||||
def _version_seconds(self):
|
||||
return int(time.time()) - int(self._version_offset / 10)
|
||||
|
||||
_version = _version_tenths
|
||||
|
||||
def add_xml(self, thing, version):
|
||||
add = etree.Element("add", id=thing._fullname, version=str(version),
|
||||
lang="en")
|
||||
|
||||
for field_name, value in self.fields(thing).iteritems():
|
||||
field = etree.SubElement(add, "field", name=field_name)
|
||||
field.text = _safe_xml_str(value)
|
||||
|
||||
return add
|
||||
|
||||
def delete_xml(self, thing, version=None):
|
||||
'''Return the cloudsearch XML representation of
|
||||
"delete this from the index"
|
||||
|
||||
'''
|
||||
version = str(version or self._version())
|
||||
delete = etree.Element("delete", id=thing._fullname, version=version)
|
||||
return delete
|
||||
|
||||
def delete_ids(self, ids):
|
||||
'''Delete documents from the index.
|
||||
'ids' should be a list of fullnames
|
||||
|
||||
'''
|
||||
version = self._version()
|
||||
deletes = [etree.Element("delete", id=id_, version=str(version))
|
||||
for id_ in ids]
|
||||
batch = etree.Element("batch")
|
||||
batch.extend(deletes)
|
||||
return self.send_documents(batch)
|
||||
|
||||
def xml_from_things(self):
|
||||
'''Generate a <batch> XML tree to send to cloudsearch for
|
||||
adding/updating/deleting the given things
|
||||
|
||||
'''
|
||||
batch = etree.Element("batch")
|
||||
self.batch_lookups()
|
||||
version = self._version()
|
||||
for thing in self.things:
|
||||
try:
|
||||
if thing._spam or thing._deleted:
|
||||
delete_node = self.delete_xml(thing, version)
|
||||
batch.append(delete_node)
|
||||
elif self.should_index(thing):
|
||||
add_node = self.add_xml(thing, version)
|
||||
batch.append(add_node)
|
||||
except (AttributeError, KeyError):
|
||||
# AttributeError may occur if a needed attribute is somehow
|
||||
# missing from the DB
|
||||
# KeyError will occur for whichever items (if any) triggered
|
||||
# the safe_get() call above, because the needed (but
|
||||
# invalid) Account or Subreddit is missing from the srs or
|
||||
# accounts dictionary
|
||||
# In either case, the sanest approach is to simply not index
|
||||
# the item. If it gets voted on later (or otherwise sent back
|
||||
# to the queue), perhaps it will have been fixed.
|
||||
pass
|
||||
return batch
|
||||
|
||||
def should_index(self, thing):
|
||||
raise NotImplementedError
|
||||
|
||||
def batch_lookups(self):
|
||||
pass
|
||||
|
||||
def fields(self, thing):
|
||||
raise NotImplementedError
|
||||
|
||||
def inject(self, quiet=False):
|
||||
'''Send things to cloudsearch. Return value is time elapsed, in seconds,
|
||||
of the communication with the cloudsearch endpoint
|
||||
|
||||
'''
|
||||
xml_things = self.xml_from_things()
|
||||
|
||||
cs_start = datetime.now(g.tz)
|
||||
if len(xml_things):
|
||||
sent = self.send_documents(xml_things)
|
||||
if not quiet:
|
||||
print sent
|
||||
return (datetime.now(g.tz) - cs_start).total_seconds()
|
||||
|
||||
def send_documents(self, docs):
|
||||
'''Open a connection to the cloudsearch endpoint, and send the documents
|
||||
for indexing. Multiple requests are sent if a large number of documents
|
||||
are being sent (see chunk_xml())
|
||||
|
||||
Raises CloudSearchHTTPError if the endpoint indicates a failure
|
||||
'''
|
||||
responses = []
|
||||
connection = httplib.HTTPConnection(self.doc_api, 80)
|
||||
chunker = chunk_xml(docs)
|
||||
try:
|
||||
if thing._spam or thing._deleted:
|
||||
delete_node = delete_xml(thing, version)
|
||||
batch.append(delete_node)
|
||||
elif thing.promoted is None and getattr(thing, "sr_id", None) != -1:
|
||||
add_node = add_xml(thing, version, srs, accounts)
|
||||
batch.append(add_node)
|
||||
except (AttributeError, KeyError):
|
||||
# AttributeError may occur if a needed attribute is somehow missing
|
||||
# from the DB
|
||||
# KeyError will occur for whichever items (if any) triggered the
|
||||
# safe_get() call above, because the needed (but invalid)
|
||||
# Account or Subreddit is missing from the srs or accounts
|
||||
# dictionary
|
||||
# In either case, the sanest approach is to simply not index the
|
||||
# item. If it gets voted on later (or otherwise sent back to the
|
||||
# queue), perhaps it will have been fixed.
|
||||
pass
|
||||
return batch
|
||||
for data in chunker:
|
||||
headers = {}
|
||||
headers['Content-Type'] = 'application/xml'
|
||||
# HTTPLib calculates Content-Length header automatically
|
||||
connection.request('POST', "/2011-02-01/documents/batch",
|
||||
data, headers)
|
||||
response = connection.getresponse()
|
||||
if 200 <= response.status < 300:
|
||||
responses.append(response.read())
|
||||
else:
|
||||
raise CloudSearchHTTPError(response.status,
|
||||
response.reason,
|
||||
response.read())
|
||||
finally:
|
||||
connection.close()
|
||||
return responses
|
||||
|
||||
|
||||
def delete_ids(ids):
|
||||
'''Delete documents from the index. 'ids' should be a list of fullnames'''
|
||||
version = _version()
|
||||
deletes = [etree.Element("delete", id=id_, version=str(version))
|
||||
for id_ in ids]
|
||||
batch = etree.Element("batch")
|
||||
batch.extend(deletes)
|
||||
return send_documents(batch)
|
||||
|
||||
|
||||
def inject(things):
|
||||
'''Send things to cloudsearch. Return value is time elapsed, in seconds,
|
||||
of the communication with the cloudsearch endpoint
|
||||
class LinkUploader(CloudSearchUploader):
|
||||
types = (Link,)
|
||||
|
||||
'''
|
||||
xml_things = xml_from_things(things)
|
||||
def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET):
|
||||
super(LinkUploader, self).__init__(doc_api, things, version_offset)
|
||||
self.accounts = {}
|
||||
self.srs = {}
|
||||
|
||||
cs_start = datetime.now(g.tz)
|
||||
if len(xml_things):
|
||||
print send_documents(xml_things)
|
||||
return (datetime.now(g.tz) - cs_start).total_seconds()
|
||||
def fields(self, thing):
|
||||
'''Return fields relevant to a Link search index'''
|
||||
account = self.accounts[thing.author_id]
|
||||
sr = self.srs[thing.sr_id]
|
||||
nsfw = sr.over_18 or thing.over_18 or Link._nsfw.findall(thing.title)
|
||||
|
||||
fields = {"ups": max(0, thing._ups),
|
||||
"downs": max(0, thing._downs),
|
||||
"num_comments": max(0, getattr(thing, 'num_comments', 0)),
|
||||
"fullname": thing._fullname,
|
||||
"subreddit": sr.name,
|
||||
"reddit": sr.name,
|
||||
"title": thing.title,
|
||||
"timestamp": thing._date.strftime("%s"),
|
||||
"sr_id": thing.sr_id,
|
||||
"over18": 1 if nsfw else 0,
|
||||
"is_self": 1 if thing.is_self else 0,
|
||||
"author_fullname": account._fullname,
|
||||
}
|
||||
|
||||
if account._deleted:
|
||||
fields['author'] = '[deleted]'
|
||||
else:
|
||||
fields['author'] = account.name
|
||||
|
||||
if thing.is_self:
|
||||
fields['site'] = g.domain
|
||||
if thing.selftext:
|
||||
fields['selftext'] = thing.selftext
|
||||
else:
|
||||
fields['url'] = thing.url
|
||||
try:
|
||||
url = r2utils.UrlParser(thing.url)
|
||||
domains = ' '.join(url.domain_permutations())
|
||||
fields['site'] = domains
|
||||
except ValueError:
|
||||
# UrlParser couldn't handle thing.url, oh well
|
||||
pass
|
||||
|
||||
return fields
|
||||
|
||||
def batch_lookups(self):
|
||||
author_ids = [thing.author_id for thing in self.things
|
||||
if hasattr(thing, 'author_id')]
|
||||
try:
|
||||
self.accounts = Account._byID(author_ids, data=True,
|
||||
return_dict=True)
|
||||
except NotFound:
|
||||
if self.use_safe_get:
|
||||
self.accounts = safe_get(Account._byID, author_ids, data=True,
|
||||
return_dict=True)
|
||||
else:
|
||||
raise
|
||||
|
||||
sr_ids = [thing.sr_id for thing in self.things
|
||||
if hasattr(thing, 'sr_id')]
|
||||
try:
|
||||
self.srs = Subreddit._byID(sr_ids, data=True, return_dict=True)
|
||||
except NotFound:
|
||||
if self.use_safe_get:
|
||||
self.srs = safe_get(Subreddit._byID, sr_ids, data=True,
|
||||
return_dict=True)
|
||||
else:
|
||||
raise
|
||||
|
||||
def should_index(self, thing):
|
||||
return (thing.promoted is None and getattr(thing, "sr_id", None) != -1)
|
||||
|
||||
|
||||
class SubredditUploader(CloudSearchUploader):
|
||||
types = (Subreddit,)
|
||||
_version = CloudSearchUploader._version_seconds
|
||||
|
||||
def fields(self, thing):
|
||||
fields = {'name': thing.name,
|
||||
'title': thing.title,
|
||||
'type': thing.type, # XXX Int type?
|
||||
'language': thing.lang,
|
||||
'header_title': thing.header_title,
|
||||
'description': thing.public_description,
|
||||
'sidebar': thing.description,
|
||||
'over18': thing.over_18, # Reminder: copy-field to nsfw
|
||||
'link_type': thing.link_type, # XXX Use integer?
|
||||
'activity': thing._downs,
|
||||
'subscribers': thing._ups,
|
||||
}
|
||||
return fields
|
||||
|
||||
def should_index(self, thing):
|
||||
return getattr(thing, 'author_id', None) != -1
|
||||
|
||||
|
||||
def chunk_xml(xml, depth=0):
|
||||
@@ -218,7 +324,8 @@ def chunk_xml(xml, depth=0):
|
||||
half = len(xml) / 2
|
||||
left_half = xml # for ease of reading
|
||||
right_half = etree.Element("batch")
|
||||
# etree magic simultaneously removes the elements from the other tree
|
||||
# etree magic simultaneously removes the elements from one tree
|
||||
# when they are appended to a different tree
|
||||
right_half.append(xml[half:])
|
||||
for chunk in chunk_xml(left_half, depth=depth):
|
||||
yield chunk
|
||||
@@ -226,45 +333,6 @@ def chunk_xml(xml, depth=0):
|
||||
yield chunk
|
||||
|
||||
|
||||
def send_documents(docs):
|
||||
'''Open a connection to the cloudsearch endpoint, and send the documents
|
||||
for indexing. Multiple requests are sent if a large number of documents
|
||||
are being sent (see chunk_xml())
|
||||
|
||||
Raises CloudSearchHTTPError if the endpoint indicates a failure
|
||||
'''
|
||||
responses = []
|
||||
connection = httplib.HTTPConnection(g.CLOUDSEARCH_DOC_API, 80)
|
||||
chunker = chunk_xml(docs)
|
||||
try:
|
||||
for data in chunker:
|
||||
headers = {}
|
||||
headers['Content-Type'] = 'application/xml'
|
||||
# HTTPLib calculates Content-Length header automatically
|
||||
connection.request('POST', "/2011-02-01/documents/batch",
|
||||
data, headers)
|
||||
response = connection.getresponse()
|
||||
if 200 <= response.status < 300:
|
||||
responses.append(response.read())
|
||||
else:
|
||||
raise CloudSearchHTTPError(response.status, response.reason,
|
||||
response.read())
|
||||
finally:
|
||||
connection.close()
|
||||
return responses
|
||||
|
||||
|
||||
def _desired_things(items, types):
|
||||
'''Pull fullnames that represent instances of 'types' out of items'''
|
||||
# This will fail if the _type_id for some things is >36
|
||||
fullnames = set()
|
||||
type_ids = [r2utils.to36(type_._type_id) for type_ in types]
|
||||
for item in items:
|
||||
if item['fullname'][1] in type_ids:
|
||||
fullnames.add(item['fullname'])
|
||||
return fullnames
|
||||
|
||||
|
||||
def _run_changed(msgs, chan):
|
||||
'''Consume the cloudsearch_changes queue, and print reporting information
|
||||
on how long it took and how many remain
|
||||
@@ -274,12 +342,18 @@ def _run_changed(msgs, chan):
|
||||
|
||||
changed = [pickle.loads(msg.body) for msg in msgs]
|
||||
|
||||
# Only handle links to start with
|
||||
|
||||
fullnames = _desired_things(changed, (Link,))
|
||||
fullnames = set()
|
||||
fullnames.update(LinkUploader.desired_fullnames(changed))
|
||||
fullnames.update(SubredditUploader.desired_fullnames(changed))
|
||||
things = Thing._by_fullname(fullnames, data=True, return_dict=False)
|
||||
|
||||
cloudsearch_time = inject(things)
|
||||
link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things)
|
||||
subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API,
|
||||
things=things)
|
||||
|
||||
link_time = link_uploader.inject()
|
||||
subreddit_time = subreddit_uploader.inject()
|
||||
cloudsearch_time = link_time + subreddit_time
|
||||
|
||||
totaltime = (datetime.now(g.tz) - start).total_seconds()
|
||||
|
||||
@@ -297,8 +371,7 @@ def run_changed(drain=False, min_size=500, limit=1000, sleep_time=10,
|
||||
|
||||
'''
|
||||
if use_safe_get:
|
||||
global USE_SAFE_GET
|
||||
USE_SAFE_GET = True
|
||||
CloudSearchUploader.use_safe_get = True
|
||||
amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size,
|
||||
limit=limit, drain=drain, sleep_time=sleep_time,
|
||||
verbose=verbose)
|
||||
@@ -308,16 +381,21 @@ def _progress_key(item):
|
||||
return "%s/%s" % (item._id, item._date)
|
||||
|
||||
|
||||
_REBUILD_INDEX_CACHE_KEY = "cloudsearch_cursor"
|
||||
_REBUILD_INDEX_CACHE_KEY = "cloudsearch_cursor_%s"
|
||||
|
||||
|
||||
def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000,
|
||||
chunk_size=1000):
|
||||
def rebuild_link_index(start_at=None, sleeptime=1, cls=Link,
|
||||
uploader=LinkUploader, doc_api='CLOUDSEARCH_DOC_API',
|
||||
estimate=50000000, chunk_size=1000):
|
||||
cache_key = _REBUILD_INDEX_CACHE_KEY % uploader.__name__.lower()
|
||||
doc_api = getattr(g, doc_api)
|
||||
uploader = uploader(doc_api)
|
||||
|
||||
if start_at is _REBUILD_INDEX_CACHE_KEY:
|
||||
start_at = g.cache.get(start_at)
|
||||
start_at = g.cache.get(cache_key)
|
||||
if not start_at:
|
||||
raise ValueError("Told me to use '%s' key, but it's not set" %
|
||||
_REBUILD_INDEX_CACHE_KEY)
|
||||
cache_key)
|
||||
|
||||
q = cls._query(cls.c._deleted == (True, False),
|
||||
sort=desc('_date'), data=True)
|
||||
@@ -329,11 +407,12 @@ def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000,
|
||||
q = r2utils.progress(q, verbosity=1000, estimate=estimate, persec=True,
|
||||
key=_progress_key)
|
||||
for chunk in r2utils.in_chunks(q, size=chunk_size):
|
||||
uploader.things = chunk
|
||||
for x in range(5):
|
||||
try:
|
||||
inject(chunk)
|
||||
uploader.inject()
|
||||
except httplib.HTTPException as err:
|
||||
print "Got %s, sleeping %s secs" % (err, x)
|
||||
print "Got %s, sleeping %s secs" % (err, x)
|
||||
time.sleep(x)
|
||||
continue
|
||||
else:
|
||||
@@ -341,17 +420,33 @@ def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000,
|
||||
else:
|
||||
raise err
|
||||
last_update = chunk[-1]
|
||||
g.cache.set(_REBUILD_INDEX_CACHE_KEY, last_update._fullname)
|
||||
g.cache.set(cache_key, last_update._fullname)
|
||||
time.sleep(sleeptime)
|
||||
|
||||
|
||||
def test_run(start_link, count=1000):
|
||||
rebuild_subreddit_index = functools.partial(rebuild_link_index,
|
||||
cls=Subreddit,
|
||||
uploader=SubredditUploader,
|
||||
doc_api='CLOUDSEARCH_SUBREDDIT_DOC_API',
|
||||
estimate=200000,
|
||||
chunk_size=1000)
|
||||
|
||||
|
||||
def test_run_link(start_link, count=1000):
|
||||
'''Inject `count` number of links, starting with `start_link`'''
|
||||
if isinstance(start_link, basestring):
|
||||
start_link = int(start_link, 36)
|
||||
links = Link._byID(range(start_link - count, start_link), data=True,
|
||||
return_dict=False)
|
||||
return inject(links)
|
||||
uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=links)
|
||||
return uploader.inject()
|
||||
|
||||
|
||||
def test_run_srs(*sr_names):
|
||||
'''Inject Subreddits by name into the index'''
|
||||
srs = Subreddit._by_name(sr_names).values()
|
||||
uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API, things=srs)
|
||||
return uploader.inject()
|
||||
|
||||
|
||||
### Query Code ###
|
||||
@@ -385,7 +480,9 @@ INVALID_QUERY_CODES = ('CS-UnknownFieldInMatchExpression',
|
||||
'CS-IncorrectFieldTypeInMatchExpression')
|
||||
def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10,
|
||||
size=1000, start=0, rank="hot", return_fields=None,
|
||||
record_stats=False):
|
||||
record_stats=False, search_api=None):
|
||||
if search_api is None:
|
||||
search_api = g.CLOUDSEARCH_SEARCH_API
|
||||
path = _encode_query(query, bq, facets, facet_count, size, start, rank,
|
||||
return_fields)
|
||||
timer = None
|
||||
@@ -393,7 +490,7 @@ def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10,
|
||||
timer = g.stats.get_timer("cloudsearch_timer")
|
||||
if timer:
|
||||
timer.start()
|
||||
connection = httplib.HTTPConnection(g.CLOUDSEARCH_SEARCH_API, 80)
|
||||
connection = httplib.HTTPConnection(search_api, 80)
|
||||
try:
|
||||
connection.request('GET', path)
|
||||
resp = connection.getresponse()
|
||||
@@ -417,11 +514,28 @@ def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10,
|
||||
connection.close()
|
||||
if timer:
|
||||
timer.stop()
|
||||
|
||||
|
||||
return json.loads(response)
|
||||
|
||||
|
||||
basic_link = functools.partial(basic_query, facets=("reddit",), facet_count=10,
|
||||
size=10, start=0, rank="hot",
|
||||
return_fields=['title', 'reddit',
|
||||
'author_fullname'],
|
||||
record_stats=False,
|
||||
search_api=g.CLOUDSEARCH_SEARCH_API)
|
||||
|
||||
|
||||
basic_subreddit = functools.partial(basic_query,
|
||||
facets=(),
|
||||
size=10, start=0,
|
||||
rank="relevance",
|
||||
return_fields=['title', 'reddit',
|
||||
'author_fullname'],
|
||||
record_stats=False,
|
||||
search_api=g.CLOUDSEARCH_SUBREDDIT_SEARCH_API)
|
||||
|
||||
|
||||
def _encode_query(query, bq, facets, facet_count, size, start, rank,
|
||||
return_fields):
|
||||
if not (query or bq):
|
||||
@@ -448,21 +562,14 @@ def _encode_query(query, bq, facets, facet_count, size, start, rank,
|
||||
|
||||
class CloudSearchQuery(object):
|
||||
'''Represents a search query sent to cloudsearch'''
|
||||
sorts = {'relevance': '-relevance',
|
||||
'top': '-top',
|
||||
'new': '-timestamp',
|
||||
}
|
||||
sorts_menu_mapping = {'relevance': 1,
|
||||
'new': 2,
|
||||
'top': 3,
|
||||
}
|
||||
search_api = None
|
||||
sorts = {}
|
||||
sorts_menu_mapping = {}
|
||||
known_syntaxes = ("cloudsearch", "lucene", "plain")
|
||||
default_syntax = "plain"
|
||||
lucene_parser = None
|
||||
|
||||
lucene_parser = l2cs.make_parser(int_fields=['timestamp'],
|
||||
yesno_fields=['over18', 'is_self'])
|
||||
known_syntaxes = ("cloudsearch", "lucene")
|
||||
default_syntax = "lucene"
|
||||
|
||||
def __init__(self, query, sr, sort, syntax=None):
|
||||
def __init__(self, query, sr=None, sort=None, syntax=None):
|
||||
if syntax is None:
|
||||
syntax = self.default_syntax
|
||||
elif syntax not in self.known_syntaxes:
|
||||
@@ -489,6 +596,114 @@ class CloudSearchQuery(object):
|
||||
self.results = Results(after_docs, hits, facets)
|
||||
return self.results
|
||||
|
||||
def _run(self, start=0, num=1000, _update=False):
|
||||
'''Run the search against self.query'''
|
||||
q = None
|
||||
if self.syntax == "cloudsearch":
|
||||
self.bq = self.customize_query(self.query)
|
||||
elif self.syntax == "lucene":
|
||||
bq = l2cs.convert(self.query, self.lucene_parser)
|
||||
self.converted_data = {"syntax": "cloudsearch",
|
||||
"converted": bq}
|
||||
self.bq = self.customize_query(bq)
|
||||
elif self.syntax == "plain":
|
||||
q = self.query
|
||||
if g.sqlprinting:
|
||||
g.log.info("%s", self)
|
||||
return self._run_cached(q, self.bq, self.sort, start=start, num=num,
|
||||
_update=_update)
|
||||
|
||||
def customize_query(self, bq):
|
||||
return bq
|
||||
|
||||
def __repr__(self):
|
||||
'''Return a string representation of this query'''
|
||||
result = ["<", self.__class__.__name__, "> query:",
|
||||
repr(self.query), " "]
|
||||
if self.bq:
|
||||
result.append(" bq:")
|
||||
result.append(repr(self.bq))
|
||||
result.append(" ")
|
||||
result.append("sort:")
|
||||
result.append(self.sort)
|
||||
return ''.join(result)
|
||||
|
||||
@classmethod
|
||||
def _run_cached(cls, query, bq, sort="relevance", start=0, num=1000,
|
||||
_update=False):
|
||||
'''Query the cloudsearch API. _update parameter allows for supposed
|
||||
easy memoization at later date.
|
||||
|
||||
Example result set:
|
||||
|
||||
{u'facets': {u'reddit': {u'constraints':
|
||||
[{u'count': 114, u'value': u'politics'},
|
||||
{u'count': 42, u'value': u'atheism'},
|
||||
{u'count': 27, u'value': u'wtf'},
|
||||
{u'count': 19, u'value': u'gaming'},
|
||||
{u'count': 12, u'value': u'bestof'},
|
||||
{u'count': 12, u'value': u'tf2'},
|
||||
{u'count': 11, u'value': u'AdviceAnimals'},
|
||||
{u'count': 9, u'value': u'todayilearned'},
|
||||
{u'count': 9, u'value': u'pics'},
|
||||
{u'count': 9, u'value': u'funny'}]}},
|
||||
u'hits': {u'found': 399,
|
||||
u'hit': [{u'id': u't3_11111'},
|
||||
{u'id': u't3_22222'},
|
||||
{u'id': u't3_33333'},
|
||||
{u'id': u't3_44444'},
|
||||
...
|
||||
],
|
||||
u'start': 0},
|
||||
u'info': {u'cpu-time-ms': 10,
|
||||
u'messages': [{u'code': u'CS-InvalidFieldOrRankAliasInRankParameter',
|
||||
u'message': u"Unable to create score object for rank '-hot'",
|
||||
u'severity': u'warning'}],
|
||||
u'rid': u'<hash>',
|
||||
u'time-ms': 9},
|
||||
u'match-expr': u"(label 'my query')",
|
||||
u'rank': u'-text_relevance'}
|
||||
|
||||
'''
|
||||
response = basic_query(query=query, bq=bq, size=num, start=start,
|
||||
rank=sort, search_api=cls.search_api,
|
||||
record_stats=True)
|
||||
|
||||
warnings = response['info'].get('messages', [])
|
||||
for warning in warnings:
|
||||
g.log.warn("%(code)s (%(severity)s): %(message)s" % warning)
|
||||
|
||||
hits = response['hits']['found']
|
||||
docs = [doc['id'] for doc in response['hits']['hit']]
|
||||
facets = response.get('facets', {})
|
||||
for facet in facets.keys():
|
||||
values = facets[facet]['constraints']
|
||||
facets[facet] = values
|
||||
|
||||
results = Results(docs, hits, facets)
|
||||
return results
|
||||
|
||||
|
||||
class LinkSearchQuery(CloudSearchQuery):
|
||||
search_api = g.CLOUDSEARCH_SEARCH_API
|
||||
sorts = {'relevance': '-relevance',
|
||||
'top': '-top',
|
||||
'new': '-timestamp',
|
||||
}
|
||||
sorts_menu_mapping = {'relevance': 1,
|
||||
'new': 2,
|
||||
'top': 3,
|
||||
}
|
||||
|
||||
lucene_parser = l2cs.make_parser(int_fields=['timestamp'],
|
||||
yesno_fields=['over18', 'is_self'])
|
||||
known_syntaxes = ("cloudsearch", "lucene")
|
||||
default_syntax = "lucene"
|
||||
|
||||
def customize_query(self, bq):
|
||||
subreddit_query = self._get_sr_restriction(self.sr)
|
||||
return self.create_boolean_query(bq, subreddit_query)
|
||||
|
||||
@classmethod
|
||||
def create_boolean_query(cls, query, subreddit_query):
|
||||
'''Join a (user-entered) text query with the generated subreddit query
|
||||
@@ -545,83 +760,17 @@ class CloudSearchQuery(object):
|
||||
bq = ["sr_id:%s" % sr._id]
|
||||
|
||||
return ' '.join(bq)
|
||||
|
||||
|
||||
class SubredditSearchQuery(CloudSearchQuery):
|
||||
search_api = g.CLOUDSEARCH_SUBREDDIT_SEARCH_API
|
||||
sorts = {'relevance': '-text_relevance',
|
||||
None: '-text_relevance',
|
||||
}
|
||||
sorts_menu_mapping = {'relevance': 1,
|
||||
}
|
||||
|
||||
def _run(self, start=0, num=1000, _update=False):
|
||||
'''Run the search against self.query'''
|
||||
subreddit_query = self._get_sr_restriction(self.sr)
|
||||
if self.syntax == "cloudsearch":
|
||||
base_query = self.query
|
||||
elif self.syntax == "lucene":
|
||||
base_query = l2cs.convert(self.query, self.lucene_parser)
|
||||
self.converted_data = {"syntax": "cloudsearch",
|
||||
"converted": base_query}
|
||||
self.bq = self.create_boolean_query(base_query, subreddit_query)
|
||||
if g.sqlprinting:
|
||||
g.log.info("%s", self)
|
||||
return self._run_cached(self.bq, self.sort, start=start, num=num,
|
||||
_update=_update)
|
||||
|
||||
def __repr__(self):
|
||||
'''Return a string representation of this query'''
|
||||
result = ["<", self.__class__.__name__, "> query:",
|
||||
repr(self.query), " "]
|
||||
if self.bq:
|
||||
result.append(" bq:")
|
||||
result.append(repr(self.bq))
|
||||
result.append(" ")
|
||||
result.append("sort:")
|
||||
result.append(self.sort)
|
||||
return ''.join(result)
|
||||
|
||||
@classmethod
|
||||
def _run_cached(cls, bq, sort="hot", start=0, num=1000, _update=False):
|
||||
'''Query the cloudsearch API. _update parameter allows for supposed
|
||||
easy memoization at later date.
|
||||
|
||||
Example result set:
|
||||
|
||||
{u'facets': {u'reddit': {u'constraints':
|
||||
[{u'count': 114, u'value': u'politics'},
|
||||
{u'count': 42, u'value': u'atheism'},
|
||||
{u'count': 27, u'value': u'wtf'},
|
||||
{u'count': 19, u'value': u'gaming'},
|
||||
{u'count': 12, u'value': u'bestof'},
|
||||
{u'count': 12, u'value': u'tf2'},
|
||||
{u'count': 11, u'value': u'AdviceAnimals'},
|
||||
{u'count': 9, u'value': u'todayilearned'},
|
||||
{u'count': 9, u'value': u'pics'},
|
||||
{u'count': 9, u'value': u'funny'}]}},
|
||||
u'hits': {u'found': 399,
|
||||
u'hit': [{u'id': u't3_11111'},
|
||||
{u'id': u't3_22222'},
|
||||
{u'id': u't3_33333'},
|
||||
{u'id': u't3_44444'},
|
||||
...
|
||||
],
|
||||
u'start': 0},
|
||||
u'info': {u'cpu-time-ms': 10,
|
||||
u'messages': [{u'code': u'CS-InvalidFieldOrRankAliasInRankParameter',
|
||||
u'message': u"Unable to create score object for rank '-hot'",
|
||||
u'severity': u'warning'}],
|
||||
u'rid': u'<hash>',
|
||||
u'time-ms': 9},
|
||||
u'match-expr': u"(label 'my query')",
|
||||
u'rank': u'-text_relevance'}
|
||||
|
||||
'''
|
||||
response = basic_query(bq=bq, size=num, start=start, rank=sort,
|
||||
record_stats=True)
|
||||
|
||||
warnings = response['info'].get('messages', [])
|
||||
for warning in warnings:
|
||||
g.log.warn("%(code)s (%(severity)s): %(message)s" % warning)
|
||||
|
||||
hits = response['hits']['found']
|
||||
docs = [doc['id'] for doc in response['hits']['hit']]
|
||||
facets = response['facets']
|
||||
for facet in facets.keys():
|
||||
values = facets[facet]['constraints']
|
||||
facets[facet] = values
|
||||
|
||||
results = Results(docs, hits, facets)
|
||||
return results
|
||||
lucene_parser = l2cs.make_parser(int_fields=['timestamp'],
|
||||
yesno_fields=['over18', 'is_self'])
|
||||
known_syntaxes = ("plain",)
|
||||
default_syntax = "plain"
|
||||
|
||||
@@ -25,6 +25,7 @@ import r2.lib.cloudsearch as cloudsearch
|
||||
InvalidQuery = (cloudsearch.InvalidQuery,)
|
||||
SearchException = (cloudsearch.CloudSearchHTTPError,)
|
||||
|
||||
SearchQuery = cloudsearch.CloudSearchQuery
|
||||
SearchQuery = cloudsearch.LinkSearchQuery
|
||||
SubredditSearchQuery = cloudsearch.SubredditSearchQuery
|
||||
|
||||
sorts = cloudsearch.CloudSearchQuery.sorts_menu_mapping
|
||||
sorts = cloudsearch.LinkSearchQuery.sorts_menu_mapping
|
||||
Reference in New Issue
Block a user