Drastically reorganize app_globals and time app startup.

This should give us more information on which steps of app startup are
taking the most time so that we can minimize the effects of pushes on
capacity.
This commit is contained in:
Neil Williams
2012-11-08 18:54:12 -08:00
parent c8a725758c
commit 929c6bdcd3
2 changed files with 201 additions and 148 deletions

View File

@@ -59,6 +59,7 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True):
r2.config.cache = g.cache
g.plugins.load_plugins()
config['r2.plugins'] = g.plugins
g.startup_timer.intermediate("plugins")
config['pylons.h'] = r2.lib.helpers
config['routes.map'] = routing.make_map()
@@ -67,10 +68,6 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True):
config['pylons.response_options']['headers'] = {}
# The following template options are passed to your template engines
#tmpl_options = {}
#tmpl_options['myghty.log_errors'] = True
#tmpl_options['myghty.escapes'] = dict(l=webhelpers.auto_link, s=webhelpers.simple_format)
tmpl_options = config['buffet.template_options']
tmpl_options['mako.filesystem_checks'] = getattr(g, 'reload_templates', False)
tmpl_options['mako.default_filters'] = ["mako_websafe"]
@@ -93,9 +90,5 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True):
tmpl_options['mako.modulename_callable'] = mako_module_path
# Add your own template options config options here,
# note that all config options will override
# any Pylons config options
# Return our loaded config object
#return config.Config(tmpl_options, map, paths)
if setup_globals:
g.setup_complete()

View File

@@ -238,6 +238,11 @@ class Globals(object):
self.config.add_spec(self.spec)
self.plugins = PluginLoader(self.config.get("plugins", []))
self.stats = Stats(self.config.get('statsd_addr'),
self.config.get('statsd_sample_rate'))
self.startup_timer = self.stats.get_timer("app_startup")
self.startup_timer.start()
self.paths = paths
self.running_as_script = global_conf.get('running_as_script', False)
@@ -258,6 +263,8 @@ class Globals(object):
dtz = global_conf.get('display_timezone', tz)
self.display_tz = pytz.timezone(dtz)
self.startup_timer.intermediate("init")
def __getattr__(self, name):
if not name.startswith('_') and name in self.config:
return self.config[name]
@@ -267,140 +274,28 @@ class Globals(object):
def setup(self):
self.queues = queues.declare_queues(self)
# heavy load mode is read only mode with a different infobar
if self.heavy_load_mode:
self.read_only_mode = True
################# CONFIGURATION
# AMQP is required
if not self.amqp_host:
raise ValueError("amqp_host not set in the .ini")
if hasattr(signal, 'SIGUSR1'):
# not all platforms have user signals
signal.signal(signal.SIGUSR1, thread_dump)
# initialize caches. Any cache-chains built here must be added
# to cache_chains (closed around by reset_caches) so that they
# can properly reset their local components
localcache_cls = (SelfEmptyingCache if self.running_as_script
else LocalCache)
num_mc_clients = self.num_mc_clients
self.cache_chains = {}
# for now, zookeeper will be an optional part of the stack.
# if it's not configured, we will grab the expected config from the
# [live_config] section of the ini file
zk_hosts = self.config.get("zookeeper_connection_string")
if zk_hosts:
from r2.lib.zookeeper import (connect_to_zookeeper,
LiveConfig, LiveList)
zk_username = self.config["zookeeper_username"]
zk_password = self.config["zookeeper_password"]
self.zookeeper = connect_to_zookeeper(zk_hosts, (zk_username,
zk_password))
self.live_config = LiveConfig(self.zookeeper, LIVE_CONFIG_NODE)
self.throttles = LiveList(self.zookeeper, "/throttles",
map_fn=ipaddress.ip_network,
reduce_fn=ipaddress.collapse_addresses)
else:
self.zookeeper = None
parser = ConfigParser.RawConfigParser()
parser.read([self.config["__file__"]])
self.live_config = extract_live_config(parser, self.plugins)
self.throttles = tuple() # immutable since it's not real
self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients)
self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients)
self.stats = Stats(self.config.get('statsd_addr'),
self.config.get('statsd_sample_rate'))
event.listens_for(engine.Engine, 'before_cursor_execute')(
self.stats.pg_before_cursor_execute)
event.listens_for(engine.Engine, 'after_cursor_execute')(
self.stats.pg_after_cursor_execute)
self.make_lock = make_lock_factory(self.lock_cache, self.stats)
# This requirement doesn't *have* to be a requirement, but there are
# bugs at the moment that will pop up if you violate it
# XXX: get rid of these options. new query cache is always on.
if self.write_query_queue and not self.use_query_cache:
raise Exception("write_query_queue requires use_query_cache")
if not self.cassandra_seeds:
raise ValueError("cassandra_seeds not set in the .ini")
keyspace = "reddit"
self.cassandra_pools = {
"main":
StatsCollectingConnectionPool(
keyspace,
stats=self.stats,
logging_name="main",
server_list=self.cassandra_seeds,
pool_size=self.cassandra_pool_size,
timeout=2,
max_retries=3,
prefill=False
),
}
perma_memcache = (CMemcache(self.permacache_memcaches, num_clients = num_mc_clients)
if self.permacache_memcaches
else None)
self.permacache = CassandraCacheChain(localcache_cls(),
CassandraCache('permacache',
self.cassandra_pools[self.cassandra_default_pool],
read_consistency_level = self.cassandra_rcl,
write_consistency_level = self.cassandra_wcl),
memcache = perma_memcache,
lock_factory = self.make_lock)
self.cache_chains.update(permacache=self.permacache)
# hardcache is done after the db info is loaded, and then the
# chains are reset to use the appropriate initial entries
if self.stalecaches:
self.cache = StaleCacheChain(localcache_cls(),
CMemcache(self.stalecaches, num_clients=num_mc_clients),
self.memcache)
else:
self.cache = MemcacheChain((localcache_cls(), self.memcache))
self.cache_chains.update(cache=self.cache)
self.rendercache = MemcacheChain((localcache_cls(),
CMemcache(self.rendercaches,
noreply=True, no_block=True,
num_clients = num_mc_clients)))
self.cache_chains.update(rendercache=self.rendercache)
self.thing_cache = CacheChain((localcache_cls(),))
self.cache_chains.update(thing_cache=self.thing_cache)
#load the database info
self.dbm = self.load_db_params()
# can't do this until load_db_params() has been called
self.hardcache = HardcacheChain((localcache_cls(),
self.memcache,
HardCache(self)),
cache_negative_results = True)
self.cache_chains.update(hardcache=self.hardcache)
# I know this sucks, but we need non-request-threads to be
# able to reset the caches, so we need them be able to close
# around 'cache_chains' without being able to call getattr on
# 'g'
cache_chains = self.cache_chains.copy()
def reset_caches():
for name, chain in cache_chains.iteritems():
chain.reset()
chain.stats = CacheStats(self.stats, name)
self.reset_caches = reset_caches
self.reset_caches()
self.REDDIT_MAIN = bool(os.environ.get('REDDIT_MAIN'))
# heavy load mode is read only mode with a different infobar
if self.heavy_load_mode:
self.read_only_mode = True
origin_prefix = self.domain_prefix + "." if self.domain_prefix else ""
self.origin = "http://" + origin_prefix + self.domain
self.secure_domains = set([urlparse(self.payment_domain).netloc])
self.trusted_domains = set([self.domain])
self.trusted_domains.update(self.authorized_cnames)
if self.https_endpoint:
@@ -436,9 +331,6 @@ class Globals(object):
print ("Warning: g.media_domain == g.domain. " +
"This may give untrusted content access to user cookies")
self.reddit_host = socket.gethostname()
self.reddit_pid = os.getpid()
for arg in sys.argv:
tokens = arg.split("=")
if len(tokens) == 2:
@@ -446,14 +338,171 @@ class Globals(object):
self.log.debug("Overriding g.%s to %s" % (k, v))
setattr(self, k, v)
#if we're going to use the query_queue, we need amqp
if self.write_query_queue and not self.amqp_host:
raise Exception("amqp_host must be defined to use the query queue")
self.reddit_host = socket.gethostname()
self.reddit_pid = os.getpid()
# This requirement doesn't *have* to be a requirement, but there are
# bugs at the moment that will pop up if you violate it
if self.write_query_queue and not self.use_query_cache:
raise Exception("write_query_queue requires use_query_cache")
if hasattr(signal, 'SIGUSR1'):
# not all platforms have user signals
signal.signal(signal.SIGUSR1, thread_dump)
self.startup_timer.intermediate("configuration")
################# ZOOKEEPER
# for now, zookeeper will be an optional part of the stack.
# if it's not configured, we will grab the expected config from the
# [live_config] section of the ini file
zk_hosts = self.config.get("zookeeper_connection_string")
if zk_hosts:
from r2.lib.zookeeper import (connect_to_zookeeper,
LiveConfig, LiveList)
zk_username = self.config["zookeeper_username"]
zk_password = self.config["zookeeper_password"]
self.zookeeper = connect_to_zookeeper(zk_hosts, (zk_username,
zk_password))
self.live_config = LiveConfig(self.zookeeper, LIVE_CONFIG_NODE)
self.throttles = LiveList(self.zookeeper, "/throttles",
map_fn=ipaddress.ip_network,
reduce_fn=ipaddress.collapse_addresses)
else:
self.zookeeper = None
parser = ConfigParser.RawConfigParser()
parser.read([self.config["__file__"]])
self.live_config = extract_live_config(parser, self.plugins)
self.throttles = tuple() # immutable since it's not real
self.startup_timer.intermediate("zookeeper")
################# MEMCACHE
num_mc_clients = self.num_mc_clients
# the main memcache pool. used for most everything.
self.memcache = CMemcache(self.memcaches, num_clients=num_mc_clients)
# a smaller pool of caches used only for distributed locks.
# TODO: move this to ZooKeeper
self.lock_cache = CMemcache(self.lockcaches,
num_clients=num_mc_clients)
self.make_lock = make_lock_factory(self.lock_cache, self.stats)
# memcaches used in front of the permacache CF in cassandra.
# XXX: this is a legacy thing; permacache was made when C* didn't have
# a row cache.
if self.permacache_memcaches:
permacache_memcaches = CMemcache(self.permacache_memcaches,
num_clients=num_mc_clients)
else:
permacache_memcaches = None
# the stalecache is a memcached local to the current app server used
# for data that's frequently fetched but doesn't need to be fresh.
if self.stalecaches:
stalecaches = CMemcache(self.stalecaches,
num_clients=num_mc_clients)
else:
stalecaches = None
# rendercache holds rendered partial templates as well as fully
# cached pages.
rendercaches = CMemcache(
self.rendercaches,
noreply=True,
no_block=True,
num_clients=num_mc_clients,
)
self.startup_timer.intermediate("memcache")
################# CASSANDRA
keyspace = "reddit"
self.cassandra_pools = {
"main":
StatsCollectingConnectionPool(
keyspace,
stats=self.stats,
logging_name="main",
server_list=self.cassandra_seeds,
pool_size=self.cassandra_pool_size,
timeout=2,
max_retries=3,
prefill=False
),
}
permacache_cf = CassandraCache(
'permacache',
self.cassandra_pools[self.cassandra_default_pool],
read_consistency_level=self.cassandra_rcl,
write_consistency_level=self.cassandra_wcl
)
self.startup_timer.intermediate("cassandra")
################# POSTGRES
event.listens_for(engine.Engine, 'before_cursor_execute')(
self.stats.pg_before_cursor_execute)
event.listens_for(engine.Engine, 'after_cursor_execute')(
self.stats.pg_after_cursor_execute)
self.dbm = self.load_db_params()
self.startup_timer.intermediate("postgres")
################# CHAINS
# initialize caches. Any cache-chains built here must be added
# to cache_chains (closed around by reset_caches) so that they
# can properly reset their local components
self.cache_chains = {}
localcache_cls = (SelfEmptyingCache if self.running_as_script
else LocalCache)
if stalecaches:
self.cache = StaleCacheChain(
localcache_cls(),
stalecaches,
self.memcache,
)
else:
self.cache = MemcacheChain((localcache_cls(), self.memcache))
self.cache_chains.update(cache=self.cache)
self.rendercache = MemcacheChain((
localcache_cls(),
rendercaches,
))
self.cache_chains.update(rendercache=self.rendercache)
# the thing_cache is used in tdb_cassandra.
self.thing_cache = CacheChain((localcache_cls(),))
self.cache_chains.update(thing_cache=self.thing_cache)
self.permacache = CassandraCacheChain(
localcache_cls(),
permacache_cf,
memcache=permacache_memcaches,
lock_factory=self.make_lock,
)
self.cache_chains.update(permacache=self.permacache)
# hardcache is used for various things that tend to expire
# TODO: replace hardcache w/ cassandra stuff
self.hardcache = HardcacheChain(
(localcache_cls(), self.memcache, HardCache(self)),
cache_negative_results=True,
)
self.cache_chains.update(hardcache=self.hardcache)
# I know this sucks, but we need non-request-threads to be
# able to reset the caches, so we need them be able to close
# around 'cache_chains' without being able to call getattr on
# 'g'
cache_chains = self.cache_chains.copy()
def reset_caches():
for name, chain in cache_chains.iteritems():
chain.reset()
chain.stats = CacheStats(self.stats, name)
self.reset_caches = reset_caches
self.reset_caches()
self.startup_timer.intermediate("cache_chains")
# try to set the source control revision numbers
self.versions = {}
@@ -465,10 +514,21 @@ class Globals(object):
i18n_git_path = os.path.join(os.path.dirname(I18N_PATH), ".git")
self.record_repo_version("i18n", i18n_git_path)
self.startup_timer.intermediate("revisions")
def setup_complete(self):
self.startup_timer.stop()
self.stats.flush()
if self.log_start:
self.log.error("reddit app %s:%s started %s at %s" %
(self.reddit_host, self.reddit_pid,
self.short_version, datetime.now()))
self.log.error(
"reddit app %s:%s started %s at %s (took %.02f seconds)",
self.reddit_host,
self.reddit_pid,
self.short_version,
datetime.now(),
self.startup_timer.elapsed_seconds()
)
def record_repo_version(self, repo_name, git_dir):
"""Get the currently checked out git revision for a given repository,