diff --git a/r2/r2/config/environment.py b/r2/r2/config/environment.py index 0aba87e5a..c276a8dbc 100644 --- a/r2/r2/config/environment.py +++ b/r2/r2/config/environment.py @@ -59,6 +59,7 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True): r2.config.cache = g.cache g.plugins.load_plugins() config['r2.plugins'] = g.plugins + g.startup_timer.intermediate("plugins") config['pylons.h'] = r2.lib.helpers config['routes.map'] = routing.make_map() @@ -67,10 +68,6 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True): config['pylons.response_options']['headers'] = {} # The following template options are passed to your template engines - #tmpl_options = {} - #tmpl_options['myghty.log_errors'] = True - #tmpl_options['myghty.escapes'] = dict(l=webhelpers.auto_link, s=webhelpers.simple_format) - tmpl_options = config['buffet.template_options'] tmpl_options['mako.filesystem_checks'] = getattr(g, 'reload_templates', False) tmpl_options['mako.default_filters'] = ["mako_websafe"] @@ -93,9 +90,5 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True): tmpl_options['mako.modulename_callable'] = mako_module_path - # Add your own template options config options here, - # note that all config options will override - # any Pylons config options - - # Return our loaded config object - #return config.Config(tmpl_options, map, paths) + if setup_globals: + g.setup_complete() diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index 2e122dbd4..1b2b3a549 100755 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -238,6 +238,11 @@ class Globals(object): self.config.add_spec(self.spec) self.plugins = PluginLoader(self.config.get("plugins", [])) + self.stats = Stats(self.config.get('statsd_addr'), + self.config.get('statsd_sample_rate')) + self.startup_timer = self.stats.get_timer("app_startup") + self.startup_timer.start() + self.paths = paths self.running_as_script = global_conf.get('running_as_script', False) @@ -258,6 +263,8 @@ class Globals(object): dtz = global_conf.get('display_timezone', tz) self.display_tz = pytz.timezone(dtz) + self.startup_timer.intermediate("init") + def __getattr__(self, name): if not name.startswith('_') and name in self.config: return self.config[name] @@ -267,140 +274,28 @@ class Globals(object): def setup(self): self.queues = queues.declare_queues(self) - # heavy load mode is read only mode with a different infobar - if self.heavy_load_mode: - self.read_only_mode = True + ################# CONFIGURATION + # AMQP is required + if not self.amqp_host: + raise ValueError("amqp_host not set in the .ini") - if hasattr(signal, 'SIGUSR1'): - # not all platforms have user signals - signal.signal(signal.SIGUSR1, thread_dump) - - # initialize caches. Any cache-chains built here must be added - # to cache_chains (closed around by reset_caches) so that they - # can properly reset their local components - - localcache_cls = (SelfEmptyingCache if self.running_as_script - else LocalCache) - num_mc_clients = self.num_mc_clients - - self.cache_chains = {} - - # for now, zookeeper will be an optional part of the stack. - # if it's not configured, we will grab the expected config from the - # [live_config] section of the ini file - zk_hosts = self.config.get("zookeeper_connection_string") - if zk_hosts: - from r2.lib.zookeeper import (connect_to_zookeeper, - LiveConfig, LiveList) - zk_username = self.config["zookeeper_username"] - zk_password = self.config["zookeeper_password"] - self.zookeeper = connect_to_zookeeper(zk_hosts, (zk_username, - zk_password)) - self.live_config = LiveConfig(self.zookeeper, LIVE_CONFIG_NODE) - self.throttles = LiveList(self.zookeeper, "/throttles", - map_fn=ipaddress.ip_network, - reduce_fn=ipaddress.collapse_addresses) - else: - self.zookeeper = None - parser = ConfigParser.RawConfigParser() - parser.read([self.config["__file__"]]) - self.live_config = extract_live_config(parser, self.plugins) - self.throttles = tuple() # immutable since it's not real - - self.memcache = CMemcache(self.memcaches, num_clients = num_mc_clients) - self.lock_cache = CMemcache(self.lockcaches, num_clients=num_mc_clients) - - self.stats = Stats(self.config.get('statsd_addr'), - self.config.get('statsd_sample_rate')) - - event.listens_for(engine.Engine, 'before_cursor_execute')( - self.stats.pg_before_cursor_execute) - event.listens_for(engine.Engine, 'after_cursor_execute')( - self.stats.pg_after_cursor_execute) - - self.make_lock = make_lock_factory(self.lock_cache, self.stats) + # This requirement doesn't *have* to be a requirement, but there are + # bugs at the moment that will pop up if you violate it + # XXX: get rid of these options. new query cache is always on. + if self.write_query_queue and not self.use_query_cache: + raise Exception("write_query_queue requires use_query_cache") if not self.cassandra_seeds: raise ValueError("cassandra_seeds not set in the .ini") - - keyspace = "reddit" - self.cassandra_pools = { - "main": - StatsCollectingConnectionPool( - keyspace, - stats=self.stats, - logging_name="main", - server_list=self.cassandra_seeds, - pool_size=self.cassandra_pool_size, - timeout=2, - max_retries=3, - prefill=False - ), - } - - perma_memcache = (CMemcache(self.permacache_memcaches, num_clients = num_mc_clients) - if self.permacache_memcaches - else None) - self.permacache = CassandraCacheChain(localcache_cls(), - CassandraCache('permacache', - self.cassandra_pools[self.cassandra_default_pool], - read_consistency_level = self.cassandra_rcl, - write_consistency_level = self.cassandra_wcl), - memcache = perma_memcache, - lock_factory = self.make_lock) - - self.cache_chains.update(permacache=self.permacache) - - # hardcache is done after the db info is loaded, and then the - # chains are reset to use the appropriate initial entries - - if self.stalecaches: - self.cache = StaleCacheChain(localcache_cls(), - CMemcache(self.stalecaches, num_clients=num_mc_clients), - self.memcache) - else: - self.cache = MemcacheChain((localcache_cls(), self.memcache)) - self.cache_chains.update(cache=self.cache) - - self.rendercache = MemcacheChain((localcache_cls(), - CMemcache(self.rendercaches, - noreply=True, no_block=True, - num_clients = num_mc_clients))) - self.cache_chains.update(rendercache=self.rendercache) - - self.thing_cache = CacheChain((localcache_cls(),)) - self.cache_chains.update(thing_cache=self.thing_cache) - - #load the database info - self.dbm = self.load_db_params() - - # can't do this until load_db_params() has been called - self.hardcache = HardcacheChain((localcache_cls(), - self.memcache, - HardCache(self)), - cache_negative_results = True) - self.cache_chains.update(hardcache=self.hardcache) - - # I know this sucks, but we need non-request-threads to be - # able to reset the caches, so we need them be able to close - # around 'cache_chains' without being able to call getattr on - # 'g' - cache_chains = self.cache_chains.copy() - def reset_caches(): - for name, chain in cache_chains.iteritems(): - chain.reset() - chain.stats = CacheStats(self.stats, name) - - self.reset_caches = reset_caches - self.reset_caches() - - self.REDDIT_MAIN = bool(os.environ.get('REDDIT_MAIN')) + # heavy load mode is read only mode with a different infobar + if self.heavy_load_mode: + self.read_only_mode = True origin_prefix = self.domain_prefix + "." if self.domain_prefix else "" self.origin = "http://" + origin_prefix + self.domain self.secure_domains = set([urlparse(self.payment_domain).netloc]) - + self.trusted_domains = set([self.domain]) self.trusted_domains.update(self.authorized_cnames) if self.https_endpoint: @@ -436,9 +331,6 @@ class Globals(object): print ("Warning: g.media_domain == g.domain. " + "This may give untrusted content access to user cookies") - self.reddit_host = socket.gethostname() - self.reddit_pid = os.getpid() - for arg in sys.argv: tokens = arg.split("=") if len(tokens) == 2: @@ -446,14 +338,171 @@ class Globals(object): self.log.debug("Overriding g.%s to %s" % (k, v)) setattr(self, k, v) - #if we're going to use the query_queue, we need amqp - if self.write_query_queue and not self.amqp_host: - raise Exception("amqp_host must be defined to use the query queue") + self.reddit_host = socket.gethostname() + self.reddit_pid = os.getpid() - # This requirement doesn't *have* to be a requirement, but there are - # bugs at the moment that will pop up if you violate it - if self.write_query_queue and not self.use_query_cache: - raise Exception("write_query_queue requires use_query_cache") + if hasattr(signal, 'SIGUSR1'): + # not all platforms have user signals + signal.signal(signal.SIGUSR1, thread_dump) + + self.startup_timer.intermediate("configuration") + + ################# ZOOKEEPER + # for now, zookeeper will be an optional part of the stack. + # if it's not configured, we will grab the expected config from the + # [live_config] section of the ini file + zk_hosts = self.config.get("zookeeper_connection_string") + if zk_hosts: + from r2.lib.zookeeper import (connect_to_zookeeper, + LiveConfig, LiveList) + zk_username = self.config["zookeeper_username"] + zk_password = self.config["zookeeper_password"] + self.zookeeper = connect_to_zookeeper(zk_hosts, (zk_username, + zk_password)) + self.live_config = LiveConfig(self.zookeeper, LIVE_CONFIG_NODE) + self.throttles = LiveList(self.zookeeper, "/throttles", + map_fn=ipaddress.ip_network, + reduce_fn=ipaddress.collapse_addresses) + else: + self.zookeeper = None + parser = ConfigParser.RawConfigParser() + parser.read([self.config["__file__"]]) + self.live_config = extract_live_config(parser, self.plugins) + self.throttles = tuple() # immutable since it's not real + self.startup_timer.intermediate("zookeeper") + + ################# MEMCACHE + num_mc_clients = self.num_mc_clients + + # the main memcache pool. used for most everything. + self.memcache = CMemcache(self.memcaches, num_clients=num_mc_clients) + + # a smaller pool of caches used only for distributed locks. + # TODO: move this to ZooKeeper + self.lock_cache = CMemcache(self.lockcaches, + num_clients=num_mc_clients) + self.make_lock = make_lock_factory(self.lock_cache, self.stats) + + # memcaches used in front of the permacache CF in cassandra. + # XXX: this is a legacy thing; permacache was made when C* didn't have + # a row cache. + if self.permacache_memcaches: + permacache_memcaches = CMemcache(self.permacache_memcaches, + num_clients=num_mc_clients) + else: + permacache_memcaches = None + + # the stalecache is a memcached local to the current app server used + # for data that's frequently fetched but doesn't need to be fresh. + if self.stalecaches: + stalecaches = CMemcache(self.stalecaches, + num_clients=num_mc_clients) + else: + stalecaches = None + + # rendercache holds rendered partial templates as well as fully + # cached pages. + rendercaches = CMemcache( + self.rendercaches, + noreply=True, + no_block=True, + num_clients=num_mc_clients, + ) + + self.startup_timer.intermediate("memcache") + + ################# CASSANDRA + keyspace = "reddit" + self.cassandra_pools = { + "main": + StatsCollectingConnectionPool( + keyspace, + stats=self.stats, + logging_name="main", + server_list=self.cassandra_seeds, + pool_size=self.cassandra_pool_size, + timeout=2, + max_retries=3, + prefill=False + ), + } + + permacache_cf = CassandraCache( + 'permacache', + self.cassandra_pools[self.cassandra_default_pool], + read_consistency_level=self.cassandra_rcl, + write_consistency_level=self.cassandra_wcl + ) + + self.startup_timer.intermediate("cassandra") + + ################# POSTGRES + event.listens_for(engine.Engine, 'before_cursor_execute')( + self.stats.pg_before_cursor_execute) + event.listens_for(engine.Engine, 'after_cursor_execute')( + self.stats.pg_after_cursor_execute) + + self.dbm = self.load_db_params() + self.startup_timer.intermediate("postgres") + + ################# CHAINS + # initialize caches. Any cache-chains built here must be added + # to cache_chains (closed around by reset_caches) so that they + # can properly reset their local components + self.cache_chains = {} + localcache_cls = (SelfEmptyingCache if self.running_as_script + else LocalCache) + + if stalecaches: + self.cache = StaleCacheChain( + localcache_cls(), + stalecaches, + self.memcache, + ) + else: + self.cache = MemcacheChain((localcache_cls(), self.memcache)) + self.cache_chains.update(cache=self.cache) + + self.rendercache = MemcacheChain(( + localcache_cls(), + rendercaches, + )) + self.cache_chains.update(rendercache=self.rendercache) + + # the thing_cache is used in tdb_cassandra. + self.thing_cache = CacheChain((localcache_cls(),)) + self.cache_chains.update(thing_cache=self.thing_cache) + + self.permacache = CassandraCacheChain( + localcache_cls(), + permacache_cf, + memcache=permacache_memcaches, + lock_factory=self.make_lock, + ) + self.cache_chains.update(permacache=self.permacache) + + # hardcache is used for various things that tend to expire + # TODO: replace hardcache w/ cassandra stuff + self.hardcache = HardcacheChain( + (localcache_cls(), self.memcache, HardCache(self)), + cache_negative_results=True, + ) + self.cache_chains.update(hardcache=self.hardcache) + + # I know this sucks, but we need non-request-threads to be + # able to reset the caches, so we need them be able to close + # around 'cache_chains' without being able to call getattr on + # 'g' + cache_chains = self.cache_chains.copy() + def reset_caches(): + for name, chain in cache_chains.iteritems(): + chain.reset() + chain.stats = CacheStats(self.stats, name) + + self.reset_caches = reset_caches + self.reset_caches() + + self.startup_timer.intermediate("cache_chains") # try to set the source control revision numbers self.versions = {} @@ -465,10 +514,21 @@ class Globals(object): i18n_git_path = os.path.join(os.path.dirname(I18N_PATH), ".git") self.record_repo_version("i18n", i18n_git_path) + self.startup_timer.intermediate("revisions") + + def setup_complete(self): + self.startup_timer.stop() + self.stats.flush() + if self.log_start: - self.log.error("reddit app %s:%s started %s at %s" % - (self.reddit_host, self.reddit_pid, - self.short_version, datetime.now())) + self.log.error( + "reddit app %s:%s started %s at %s (took %.02f seconds)", + self.reddit_host, + self.reddit_pid, + self.short_version, + datetime.now(), + self.startup_timer.elapsed_seconds() + ) def record_repo_version(self, repo_name, git_dir): """Get the currently checked out git revision for a given repository,