Use s3 to read and store subreddit sitemaps.

This commit is contained in:
Shawn Krisman
2016-06-13 14:12:54 -07:00
parent 4b2a669d6f
commit 7edd1ec8cd
15 changed files with 394 additions and 243 deletions

View File

@@ -300,6 +300,17 @@ modmail_sender_email =
modmail_system_email =
modmail_email_domain =
############################################ SITEMAPS
# path of the subreddit sitemap index
sitemap_subreddit_keyname =
# s3 information related to where the sitemaps are statically stored
sitemap_upload_s3_bucket =
sitemap_s3_static_host =
sitemap_subreddit_static_url =
# the SQS queue that we use to receive messages to modify our sitemaps
sitemap_sqs_queue =
############################################ MEDIA STORAGE
# which backend provider to use for media (thumbnails, subreddit stylesheets,
# subreddit images, app icons). options are:

View File

@@ -61,9 +61,6 @@ def make_map(config):
mc('/robots.txt', controller='robots', action='robots')
mc('/crossdomain', controller='robots', action='crossdomain')
mc('/sitemap', controller='sitemap', action='index')
mc('/subreddit_sitemap', controller='sitemap', action='subreddits')
mc('/login', controller='forms', action='login')
mc('/register', controller='forms', action='register')
mc('/logout', controller='forms', action='logout')

View File

@@ -96,7 +96,6 @@ def load_controllers():
from oauth2 import OAuth2AccessController
from redirect import RedirectController
from robots import RobotsController
from sitemap import SitemapController
from ipn import IpnController
from ipn import StripeController
from ipn import CoinbaseController

View File

@@ -1,42 +0,0 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
# Inc. All Rights Reserved.
###############################################################################
from pylons import response
from r2.controllers.reddit_base import MinimalController
from r2.lib.db import tdb_cassandra
from r2.lib.validator import validate, VInt
from r2.models.sitemap import Sitemap
class SitemapController(MinimalController):
def GET_index(self):
response.content_type = 'application/xml'
return Sitemap.sitemap_index()
@validate(index=VInt('index', 0, 50000))
def GET_subreddits(self, index):
response.content_type = 'application/xml'
try:
return Sitemap.subreddit_sitemap(index)
except tdb_cassandra.NotFound:
return self.abort404()

View File

@@ -0,0 +1,120 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
# Inc. All Rights Reserved.
###############################################################################
import snappy
import struct
class HadoopStreamDecompressor(object):
"""This class implements the decompressor-side of the hadoop framing
format.
Hadoop fraiming format consists of one or more blocks, each of which is
composed of one or more compressed subblocks. The block size is the
uncompressed size, while the subblock size is the size of the compressed
data.
https://github.com/andrix/python-snappy/pull/35/files
"""
__slots__ = ["_buf", "_block_size", "_block_read", "_subblock_size"]
def __init__(self):
self._buf = b""
self._block_size = None
self._block_read = 0
self._subblock_size = None
def decompress(self, data):
self._buf += data
output = b""
while True:
# decompress block will attempt to decompress any subblocks if it
# has already read the block size and subblock size.
buf = self._decompress_block()
if len(buf) > 0:
output += buf
else:
break
return output
def _decompress_block(self):
if self._block_size is None:
if len(self._buf) <= 4:
return b""
self._block_size = struct.unpack(">i", self._buf[:4])[0]
self._buf = self._buf[4:]
output = b""
while self._block_read < self._block_size:
buf = self._decompress_subblock()
if len(buf) > 0:
output += buf
else:
# Buffer doesn't contain full subblock
break
if self._block_read == self._block_size:
# We finished reading this block, so reinitialize.
self._block_read = 0
self._block_size = None
return output
def _decompress_subblock(self):
if self._subblock_size is None:
if len(self._buf) <= 4:
return b""
self._subblock_size = struct.unpack(">i", self._buf[:4])[0]
self._buf = self._buf[4:]
# Only attempt to decompress complete subblocks.
if len(self._buf) < self._subblock_size:
return b""
compressed = self._buf[:self._subblock_size]
self._buf = self._buf[self._subblock_size:]
uncompressed = snappy.uncompress(compressed)
self._block_read += len(uncompressed)
self._subblock_size = None
return uncompressed
def flush(self):
if self._buf != b"":
raise snappy.UncompressError("chunk truncated")
return b""
def copy(self):
copy = HadoopStreamDecompressor()
copy._buf = self._buf
copy._block_size = self._block_size
copy._block_read = self._block_read
copy._subblock_size = self._subblock_size
return copy
def hadoop_decompress(src, dst, blocksize=snappy._STREAM_TO_STREAM_BLOCK_SIZE):
decompressor = HadoopStreamDecompressor()
while True:
buf = src.read(blocksize)
if not buf:
break
buf = decompressor.decompress(buf)
if buf:
dst.write(buf)
decompressor.flush() # makes sure the stream ended well

View File

@@ -203,8 +203,10 @@ def responsive(res, space_compress=None):
class Robots(Templated):
pass
def __init__(self, **context):
Templated.__init__(self, **context)
self.subreddit_sitemap = g.sitemap_subreddit_static_url
class CrossDomain(Templated):
pass

View File

@@ -30,6 +30,7 @@ import sys
import time
import datetime
import pytz
from collections import namedtuple
from pylons import app_globals as g
@@ -69,6 +70,17 @@ def _from_path(path):
return bucket, key
S3Path = namedtuple('S3Path', ['bucket', 'key'])
def parse_s3_path(path):
return S3Path(*_from_path(path))
def format_expires(expires):
return expires.strftime(EXPIRES_DATE_FORMAT)
def get_text_from_s3(s3_connection, path):
"""Read a file from S3 and return it as text."""
bucket_name, key_name = _from_path(path)
@@ -363,4 +375,3 @@ def get_post_args(
"action": "//%s.%s" % (bucket, g.s3_media_domain),
"fields": fields,
}

View File

@@ -34,8 +34,7 @@ Reddit contains tons and tons of links. Generating them on the fly is simply
impractical. The solution this module implements is a very slow batch that
goes through every subreddit and every link and creates crawlable permalinks
from them. These links are then put into sitemaps and stored in
r2.lib.sitemap.Sitemap. The sitemap controllers then simply serve up the xml
files that they find in the Sitemap Thing.
s3. We then upload those sitemaps as static files to s3 where we host them.
The Sitemap protocol specifies a hard limit of 50000 links. Since we have
significantly more links than that, we have to define a Sitemap Index
@@ -57,11 +56,11 @@ This module is split into 3 parts.
r2.lib.sitemaps.data - Loads up the raw Subreddit and Link Things.
r2.lib.sitemaps.generate - Transforms the Things into sitemap xml strings.
r2.lib.sitemaps.store - Glue code that makes and stores the sitemaps.
r2.lib.sitemaps.store - Stores the sitemaps on s3.
r2.lib.sitemaps.watcher - Reads from the SQS queue and starts a new upload
The only function that's supposed to be used outside of this module is
r2.lib.sitemaps.store.store_sitemaps which dumps the new
sitemaps into r2.models.sitemap.Sitemap. This is designed to be used
through a daily cron job given the slow speed of the operation.
r2.lib.sitemaps.watcher.watcher. This is designed to be used as a constantly
running daemon.
"""

View File

@@ -25,53 +25,49 @@
Currently only supports subreddit links but will soon support comment links.
"""
import hashlib
import itertools
import tempfile
from boto.s3.connection import S3Connection
from pylons import app_globals as g
from r2.lib.db.operators import asc
from r2.lib.utils import fetch_things2, rate_limited_generator
from r2.models.subreddit import Subreddit
from r2.lib.hadoop_decompress import hadoop_decompress
DB_CHUNK_SIZE = 50000
DB_RATE_LIMIT = DB_CHUNK_SIZE
EXPERIMENT_SUBREDDIT_SITEMAP = 'experiment-subreddit-sitemap'
# number of possible ways a subreddit can be partitioned for the experiment.
EXPERIMENT_BUCKET_COUNT = 20
def _read_subreddit_etl_from_s3(s3path):
s3conn = S3Connection()
bucket = s3conn.get_bucket(s3path.bucket, validate=False)
s3keys = bucket.list(s3path.key)
key_count = 0
for s3key in s3keys:
g.log.info("Importing key %r", s3key)
with tempfile.TemporaryFile(mode='rw+b') as ntf_download:
with tempfile.TemporaryFile(mode='rw+b') as ntf_decompress:
# download it
g.log.debug("Downloading %r", s3key)
s3key.get_contents_to_file(ntf_download)
# decompress it
ntf_download.flush()
ntf_download.seek(0)
g.log.debug("Decompressing %r", s3key)
hadoop_decompress(ntf_download, ntf_decompress)
ntf_decompress.flush()
ntf_decompress.seek(0)
# import it
g.log.debug("Starting import of %r", s3key)
for line in ntf_decompress:
yield line
key_count += 1
if key_count == 0:
raise ValueError('{0} contains no readable keys.'.format(s3path))
def rate_limit_query(query):
return rate_limited_generator(
DB_RATE_LIMIT,
fetch_things2(query, DB_CHUNK_SIZE),
)
def is_part_of_experiment(subreddit):
"""Decide that this subreddit is part of the seo traffic experiment.
At the moment the features system (r2/config/feature/README.md)
is designed to be bucketed on a per user basis. We would like an
experiment that is bucketed by subreddits instead. To do this we
are going completely around the features system and instead
bucketing the code here and communicating our hashing method with
the data team.
Much of this logic is borrowed from FeatureState.
"""
key = '_'.join((EXPERIMENT_SUBREDDIT_SITEMAP, subreddit.name))
hashed = hashlib.sha1(key)
bucket = long(hashed.hexdigest(), 16) % EXPERIMENT_BUCKET_COUNT
return bucket == 0
def is_subreddit_to_crawl(subreddit):
return (subreddit.quarantine == False and
subreddit.over_18 == False and
is_part_of_experiment(subreddit))
def find_all_subreddits():
iterator = rate_limit_query(Subreddit._query(
*[Subreddit.c.type != type_ for type_ in Subreddit.private_types],
sort=asc('_date')))
return itertools.ifilter(is_subreddit_to_crawl, iterator)
def find_all_subreddits(s3path):
for line in _read_subreddit_etl_from_s3(s3path):
_, subreddit, __ = line.split('\x01')
yield subreddit

View File

@@ -65,6 +65,7 @@ Each sitemap and sitemap index will have 50000 links or fewer.
from lxml import etree
from pylons import app_globals as g
from r2.lib.template_helpers import add_sr
from r2.lib.utils import in_chunks
@@ -72,11 +73,11 @@ SITEMAP_NAMESPACE = "http://www.sitemaps.org/schemas/sitemap/0.9"
LINKS_PER_SITEMAP = 50000
def absolute_url(path):
def _absolute_url(path):
return add_sr(path, force_https=True, sr_path=False)
def stringify_xml(root_element):
def _stringify_xml(root_element):
return etree.tostring(
root_element,
pretty_print=g.debug,
@@ -85,18 +86,19 @@ def stringify_xml(root_element):
)
def subreddit_links(subreddits):
def _subreddit_links(subreddits):
for subreddit in subreddits:
yield absolute_url(subreddit.path)
path = '/r/{0}/'.format(subreddit)
yield _absolute_url(path)
def subreddit_sitemap(subreddits):
urlset = etree.Element('urlset', xmlns=SITEMAP_NAMESPACE)
for link in subreddit_links(subreddits):
def _subreddit_sitemap(subreddits):
urlset = etree.Element('urlset', xmlns=SITEMAP_NAMESPACE)
for link in _subreddit_links(subreddits):
url_elem = etree.SubElement(urlset, 'url')
loc_elem = etree.SubElement(url_elem, 'loc')
loc_elem.text = link
return stringify_xml(urlset)
return _stringify_xml(urlset)
def subreddit_sitemaps(subreddits):
@@ -106,13 +108,15 @@ def subreddit_sitemaps(subreddits):
links according to the sitemap standard.
"""
for subreddit_chunks in in_chunks(subreddits, LINKS_PER_SITEMAP):
yield subreddit_sitemap(subreddit_chunks)
yield _subreddit_sitemap(subreddit_chunks)
def sitemap_index(count):
sm_elem = etree.Element('sitemapindex', xmlns=SITEMAP_NAMESPACE)
sm_elem = etree.Element('sitemapindex', xmlns=SITEMAP_NAMESPACE)
for i in xrange(count):
sitemap_elem = etree.SubElement(sm_elem, 'sitemap')
loc_elem = etree.SubElement(sitemap_elem, 'loc')
loc_elem.text = absolute_url('/subreddit_sitemap?index={0}'.format(i))
return stringify_xml(sm_elem)
url = '{0}/subreddit_sitemap/{1}.xml'.format(
g.sitemap_s3_static_host, i)
loc_elem.text = url
return _stringify_xml(sm_elem)

View File

@@ -20,12 +20,78 @@
# Inc. All Rights Reserved.
###############################################################################
from r2.models.sitemap import SitemapUpdater
from r2.lib.sitemaps.data import find_all_subreddits
"""Store sitemaps in s3.
This module is uploads all subreddit sitemaps as well as the sitemap index
to s3. The basic idea is that amazon will be serving the static sitemaps for
us.
The binary data we send to s3 is a gzipped xml file. In addition we also
send the appropriate type and encoding headers so this is understood
correctly by the browser.
The only file expected to be used outside this module is:
store_sitemaps_in_s3(subreddits)
Even though the subreddits are expected to be generated and passed into this
function, the sitemap index is created here. The reasoning is that in order
to create the sitemap index we need to know how many sitemaps we have.
If we simply queried the subreddit iterator for it's length then we would
have to load all of the subreddits into memory, which would be ... bad.
"""
import gzip
from StringIO import StringIO
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from pylons import app_globals as g
from r2.lib.sitemaps.generate import subreddit_sitemaps, sitemap_index
def store_sitemaps():
sitemap_updater = SitemapUpdater()
for subreddit_sitemap in subreddit_sitemaps(find_all_subreddits()):
sitemap_updater.add_subreddit_sitemap(subreddit_sitemap)
sitemap_updater.add_sitemap_index(sitemap_index(sitemap_updater.count))
HEADERS = {
'Content-Type': 'text/xml',
'Content-Encoding': 'gzip',
}
def zip_string(string):
zipbuffer = StringIO()
with gzip.GzipFile(mode='w', fileobj=zipbuffer) as f:
f.write(string)
return zipbuffer.getvalue()
def upload_sitemap(key, sitemap):
key.set_contents_from_string(zip_string(sitemap), headers=HEADERS)
def store_subreddit_sitemap(bucket, index, sitemap):
key = Key(bucket)
key.key = 'subreddit_sitemap/{0}.xml'.format(index)
g.log.debug("Uploading %r", key)
upload_sitemap(key, sitemap)
def store_sitemap_index(bucket, count):
key = Key(bucket)
key.key = g.sitemap_subreddit_keyname
g.log.debug("Uploading %r", key)
upload_sitemap(key, sitemap_index(count))
def store_sitemaps_in_s3(subreddits):
s3conn = S3Connection()
bucket = s3conn.get_bucket(g.sitemap_upload_s3_bucket, validate=False)
sitemap_count = 0
for i, sitemap in enumerate(subreddit_sitemaps(subreddits)):
store_subreddit_sitemap(bucket, i, sitemap)
sitemap_count += 1
store_sitemap_index(bucket, sitemap_count)

View File

@@ -0,0 +1,101 @@
import datetime
import dateutil
import json
import pytz
import time
from boto.s3.connection import S3Connection
from boto.sqs.connection import SQSConnection
from pylons import app_globals as g
from r2.lib.s3_helpers import parse_s3_path
from r2.lib.sitemaps.store import store_sitemaps_in_s3
from r2.lib.sitemaps.data import find_all_subreddits
"""Watch for SQS messages informing us to read, generate, and store sitemaps.
There is only function that should be used outside this module
watcher()
It is designed to be used in a daemon process.
"""
def watcher():
"""Poll for new sitemap data and process it as necessary."""
while True:
_process_message()
def _subreddit_sitemap_key():
conn = S3Connection()
bucket = conn.get_bucket(g.sitemap_upload_s3_bucket, validate=False)
return bucket.get_key(g.sitemap_subreddit_keyname)
def _datetime_from_timestamp(timestamp):
return datetime.datetime.fromtimestamp(timestamp / 1000, pytz.utc)
def _before_last_sitemap(timestamp):
sitemap_key = _subreddit_sitemap_key()
if sitemap_key is None:
return False
sitemap_datetime = dateutil.parser.parse(sitemap_key.last_modified)
compare_datetime = _datetime_from_timestamp(timestamp)
return compare_datetime < sitemap_datetime
def _process_message():
if not g.sitemap_sqs_queue:
return
sqs = SQSConnection()
sqs_q = sqs.get_queue(g.sitemap_sqs_queue)
messages = sqs.receive_message(sqs_q, number_messages=1)
if not messages:
return
message, = messages
js = json.loads(message.get_body())
s3path = parse_s3_path(js['location'])
# There are some error cases that allow us to get messages
# for sitemap creation that are now out of date.
timestamp = js.get('timestamp')
if timestamp is not None and _before_last_sitemap(timestamp):
sqs_q.delete_message(message)
return
g.log.info("Got import job %r", js)
subreddits = find_all_subreddits(s3path)
store_sitemaps_in_s3(subreddits)
sqs_q.delete_message(message)
def _current_timestamp():
return time.time() * 1000
def _create_test_message():
"""A dev only function that drops a new message on the sqs queue."""
sqs = SQSConnection()
sqs_q = sqs.get_queue(g.sitemap_sqs_queue)
# it returns None on failure
assert sqs_q, "failed to connect to queue"
message = sqs_q.new_message(body=json.dumps({
'job_name': 'daily-sr-sitemap-reporting',
'location': ('s3://reddit-data-analysis/big-data/r2/prod/' +
'daily_sr_sitemap_reporting/dt=2016-06-14'),
'timestamp': _current_timestamp(),
}))
sqs_q.write(message)

View File

@@ -1,126 +0,0 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
# Inc. All Rights Reserved.
###############################################################################
from datetime import datetime, timedelta
import uuid
from pycassa.system_manager import ASCII_TYPE, UTF8_TYPE, DATE_TYPE
from r2.lib.db import tdb_cassandra
from r2.lib.db.operators import desc
TTL = timedelta(days=10)
class Sitemap(tdb_cassandra.View):
"""Sitemaps that store the current state of the reddits.
See: http://www.sitemaps.org/protocol.html
We use a method of indirection to store the actual sitemaps. Even
though `Sitemap._retrieve_sitemap(Sitemap.INDEX_KEY)` can be used to get
the sitemap index, it's not the same as `Sitemap._byID(Sitemap.INDEX_KEY)`.
Instead for every batch update we create a unique subkey.
`Sitemap._byID(Sitemap.INDEX_KEY)`. Then returns the INDEX_KEY appended
to the current subkey. We then use that to retrieve the actual sitemap.
"""
_use_db = True
_ttl = TTL
INDEX_KEY = 'key'
SUBREDDIT_KEY = 'subreddit_{0}'
@classmethod
def sitemap_index(cls):
"""Find the current sitemap index."""
return cls._retrieve_sitemap(cls.INDEX_KEY)
@classmethod
def subreddit_sitemap(cls, index):
"""Find one of the sitemaps dedicated to subreddit links."""
return cls._retrieve_sitemap(cls._subreddit_key(index))
@classmethod
def add_subreddit_sitemap(cls, sitemap, index, subkey):
key = cls._subreddit_key(index)
joined_key = cls._joined_key(key, subkey)
cls._set_values(joined_key, {'sitemap': sitemap})
cls._set_values(key, {'latest': joined_key})
@classmethod
def add_sitemap_index(cls, sitemap_index, subkey):
joined_key = cls._joined_key(cls.INDEX_KEY, subkey)
cls._set_values(joined_key, {'sitemap': sitemap_index})
cls._set_values(cls.INDEX_KEY, {'latest': joined_key})
@staticmethod
def generate_subkey():
return datetime.now().strftime('%y%m%d%H%M%S')
@classmethod
def _retrieve_sitemap(cls, sitemap_key):
joined_key = cls._byID(sitemap_key).latest
return cls._byID(joined_key).sitemap
@classmethod
def _subreddit_key(cls, index):
return cls.SUBREDDIT_KEY.format(index)
@staticmethod
def _joined_key(key, subkey):
return '_'.join((key, subkey))
class SitemapUpdater(object):
"""A class that facilitates the saving of many sitemaps.
This minimal helper class maintains the state of the subkey as well as the
indices of the various sitemap types.
Usage:
>>> su = SitemapUpdater()
>>> su.add_subreddit_sitemap(subreddit_sitemap_1)
>>> su.add_subreddit_sitemap(subreddit_sitemap_2)
>>> su.add_comment_page_sitemap(comment_page_sitemap)
>>> su.add_sitemap_index(create_sitemap_index(su.count))
>>> su.count # evaluates to 3
"""
def __init__(self):
self._subkey = Sitemap.generate_subkey()
self._subreddit_count = 0
def add_subreddit_sitemap(self, sitemap):
Sitemap.add_subreddit_sitemap(
sitemap, self._subreddit_count, self._subkey)
self._subreddit_count += 1
def add_sitemap_index(self, sitemap_index):
Sitemap.add_sitemap_index(sitemap_index, self._subkey)
@property
def count(self):
# note that sitemap indices don't count towards this count.
return self._subreddit_count

View File

@@ -62,4 +62,4 @@ Disallow: /search
Disallow: /r/*/search
Allow: /
Sitemap: ${add_sr('sitemap', sr_path=False, force_https=True)}
Sitemap: ${thing.subreddit_sitemap}

View File

@@ -0,0 +1,13 @@
description "build sitemaps for almost every link on reddit"
instance $x
stop on reddit-stop or runlevel [016]
respawn
nice 10
script
. /etc/default/reddit
wrap-job paster run --proctitle sitemaps_q$x $REDDIT_INI $REDDIT_ROOT/r2/lib/sitemaps/watcher.py -c 'watcher()'
end script