mirror of
https://github.com/github/rails.git
synced 2026-04-04 03:00:58 -04:00
Initial conversion to connection pool
So far so good, tests still run clean. Next steps: synchronize connection pool access and modification, and change allow_concurrency to simply switch a real lock for a null lock.
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
module ActiveRecord
|
||||
module ConnectionAdapters
|
||||
class ConnectionPool
|
||||
# Check for activity after at least +verification_timeout+ seconds.
|
||||
# Defaults to 0 (always check.)
|
||||
attr_accessor :verification_timeout
|
||||
attr_reader :active_connections, :spec
|
||||
|
||||
def initialize(spec)
|
||||
@verification_timeout = 0
|
||||
|
||||
# The thread id -> adapter cache.
|
||||
@active_connections = {}
|
||||
|
||||
# The ConnectionSpecification for this pool
|
||||
@spec = spec
|
||||
end
|
||||
|
||||
def active_connection_name #:nodoc:
|
||||
Thread.current.object_id
|
||||
end
|
||||
|
||||
def active_connection
|
||||
active_connections[active_connection_name]
|
||||
end
|
||||
|
||||
# Returns the connection currently associated with the class. This can
|
||||
# also be used to "borrow" the connection to do database work unrelated
|
||||
# to any of the specific Active Records.
|
||||
def connection
|
||||
if conn = active_connections[active_connection_name]
|
||||
conn
|
||||
else
|
||||
# retrieve_connection sets the cache key.
|
||||
conn = retrieve_connection
|
||||
active_connections[active_connection_name] = conn
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes to connections.
|
||||
def clear_active_connections!
|
||||
clear_entries!(@active_connections, [active_connection_name]) do |name, conn|
|
||||
conn.disconnect!
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes
|
||||
def clear_reloadable_connections!
|
||||
@active_connections.each do |name, conn|
|
||||
if conn.requires_reloading?
|
||||
conn.disconnect!
|
||||
@active_connections.delete(name)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Verify active connections.
|
||||
def verify_active_connections! #:nodoc:
|
||||
remove_stale_cached_threads!(@active_connections) do |name, conn|
|
||||
conn.disconnect!
|
||||
end
|
||||
active_connections.each_value do |connection|
|
||||
connection.verify!(@verification_timeout)
|
||||
end
|
||||
end
|
||||
|
||||
def retrieve_connection #:nodoc:
|
||||
# Name is nil if establish_connection hasn't been called for
|
||||
# some class along the inheritance chain up to AR::Base yet.
|
||||
name = active_connection_name
|
||||
if conn = active_connections[name]
|
||||
# Verify the connection.
|
||||
conn.verify!(@verification_timeout)
|
||||
else
|
||||
self.connection = spec
|
||||
conn = active_connections[name]
|
||||
end
|
||||
|
||||
conn or raise ConnectionNotEstablished
|
||||
end
|
||||
|
||||
# Returns true if a connection that's accessible to this class has already been opened.
|
||||
def connected?
|
||||
active_connections[active_connection_name] ? true : false
|
||||
end
|
||||
|
||||
def disconnect!
|
||||
clear_cache!(@active_connections) do |name, conn|
|
||||
conn.disconnect!
|
||||
end
|
||||
end
|
||||
|
||||
# Set the connection for the class.
|
||||
def connection=(spec) #:nodoc:
|
||||
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
|
||||
active_connections[active_connection_name] = spec
|
||||
elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
|
||||
self.connection = ActiveRecord::Base.send(spec.adapter_method, spec.config)
|
||||
else
|
||||
raise ConnectionNotEstablished
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def clear_cache!(cache, &block)
|
||||
cache.each(&block) if block_given?
|
||||
cache.clear
|
||||
end
|
||||
|
||||
# Remove stale threads from the cache.
|
||||
def remove_stale_cached_threads!(cache, &block)
|
||||
stale = Set.new(cache.keys)
|
||||
|
||||
Thread.list.each do |thread|
|
||||
stale.delete(thread.object_id) if thread.alive?
|
||||
end
|
||||
clear_entries!(cache, stale, &block)
|
||||
end
|
||||
|
||||
def clear_entries!(cache, keys, &block)
|
||||
keys.each do |key|
|
||||
block.call(key, cache[key])
|
||||
cache.delete(key)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -14,156 +14,45 @@ module ActiveRecord
|
||||
cattr_accessor :verification_timeout, :instance_writer => false
|
||||
@@verification_timeout = 0
|
||||
|
||||
# The class -> [adapter_method, config] map
|
||||
# The class -> connection pool map
|
||||
@@defined_connections = {}
|
||||
|
||||
# The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency)
|
||||
@@active_connections = {}
|
||||
|
||||
class << self
|
||||
# Retrieve the connection cache.
|
||||
def thread_safe_active_connections #:nodoc:
|
||||
@@active_connections[Thread.current.object_id] ||= {}
|
||||
end
|
||||
|
||||
def single_threaded_active_connections #:nodoc:
|
||||
@@active_connections
|
||||
end
|
||||
|
||||
# pick up the right active_connection method from @@allow_concurrency
|
||||
if @@allow_concurrency
|
||||
alias_method :active_connections, :thread_safe_active_connections
|
||||
else
|
||||
alias_method :active_connections, :single_threaded_active_connections
|
||||
end
|
||||
|
||||
# set concurrency support flag (not thread safe, like most of the methods in this file)
|
||||
def allow_concurrency=(threaded) #:nodoc:
|
||||
logger.debug "allow_concurrency=#{threaded}" if logger
|
||||
return if @@allow_concurrency == threaded
|
||||
clear_all_cached_connections!
|
||||
@@allow_concurrency = threaded
|
||||
method_prefix = threaded ? "thread_safe" : "single_threaded"
|
||||
sing = (class << self; self; end)
|
||||
[:active_connections, :scoped_methods].each do |method|
|
||||
sing.send(:alias_method, method, "#{method_prefix}_#{method}")
|
||||
end
|
||||
log_connections if logger
|
||||
end
|
||||
|
||||
def active_connection_name #:nodoc:
|
||||
@active_connection_name ||=
|
||||
if active_connections[name] || @@defined_connections[name]
|
||||
name
|
||||
elsif self == ActiveRecord::Base
|
||||
nil
|
||||
else
|
||||
superclass.active_connection_name
|
||||
end
|
||||
end
|
||||
|
||||
def clear_active_connection_name #:nodoc:
|
||||
@active_connection_name = nil
|
||||
subclasses.each { |klass| klass.clear_active_connection_name }
|
||||
# for internal use only
|
||||
def active_connections
|
||||
@@defined_connections.inject([]) {|arr,kv| arr << kv.last.active_connection}.compact.uniq
|
||||
end
|
||||
|
||||
# Returns the connection currently associated with the class. This can
|
||||
# also be used to "borrow" the connection to do database work unrelated
|
||||
# to any of the specific Active Records.
|
||||
def connection
|
||||
if defined?(@active_connection_name) && (conn = active_connections[@active_connection_name])
|
||||
conn
|
||||
else
|
||||
# retrieve_connection sets the cache key.
|
||||
conn = retrieve_connection
|
||||
active_connections[@active_connection_name] = conn
|
||||
end
|
||||
retrieve_connection
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes to connections.
|
||||
def clear_active_connections!
|
||||
clear_cache!(@@active_connections) do |name, conn|
|
||||
conn.disconnect!
|
||||
clear_cache!(@@defined_connections) do |name, pool|
|
||||
pool.disconnect!
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes
|
||||
def clear_reloadable_connections!
|
||||
if @@allow_concurrency
|
||||
# With concurrent connections @@active_connections is
|
||||
# a hash keyed by thread id.
|
||||
@@active_connections.each do |thread_id, conns|
|
||||
conns.each do |name, conn|
|
||||
if conn.requires_reloading?
|
||||
conn.disconnect!
|
||||
@@active_connections[thread_id].delete(name)
|
||||
end
|
||||
end
|
||||
end
|
||||
else
|
||||
@@active_connections.each do |name, conn|
|
||||
if conn.requires_reloading?
|
||||
conn.disconnect!
|
||||
@@active_connections.delete(name)
|
||||
end
|
||||
end
|
||||
clear_cache!(@@defined_connections) do |name, pool|
|
||||
pool.clear_reloadable_connections!
|
||||
end
|
||||
end
|
||||
|
||||
# Verify active connections.
|
||||
def verify_active_connections! #:nodoc:
|
||||
if @@allow_concurrency
|
||||
remove_stale_cached_threads!(@@active_connections) do |name, conn|
|
||||
conn.disconnect!
|
||||
end
|
||||
end
|
||||
|
||||
active_connections.each_value do |connection|
|
||||
connection.verify!(@@verification_timeout)
|
||||
end
|
||||
@@defined_connections.each_value {|pool| pool.verify_active_connections!}
|
||||
end
|
||||
|
||||
private
|
||||
def clear_cache!(cache, thread_id = nil, &block)
|
||||
if cache
|
||||
if @@allow_concurrency
|
||||
thread_id ||= Thread.current.object_id
|
||||
thread_cache, cache = cache, cache[thread_id]
|
||||
return unless cache
|
||||
end
|
||||
|
||||
cache.each(&block) if block_given?
|
||||
cache.clear
|
||||
end
|
||||
ensure
|
||||
if thread_cache && @@allow_concurrency
|
||||
thread_cache.delete(thread_id)
|
||||
end
|
||||
end
|
||||
|
||||
# Remove stale threads from the cache.
|
||||
def remove_stale_cached_threads!(cache, &block)
|
||||
stale = Set.new(cache.keys)
|
||||
|
||||
Thread.list.each do |thread|
|
||||
stale.delete(thread.object_id) if thread.alive?
|
||||
end
|
||||
|
||||
stale.each do |thread_id|
|
||||
clear_cache!(cache, thread_id, &block)
|
||||
end
|
||||
end
|
||||
|
||||
def clear_all_cached_connections!
|
||||
if @@allow_concurrency
|
||||
@@active_connections.each_value do |connection_hash_for_thread|
|
||||
connection_hash_for_thread.each_value {|conn| conn.disconnect! }
|
||||
connection_hash_for_thread.clear
|
||||
end
|
||||
else
|
||||
@@active_connections.each_value {|conn| conn.disconnect! }
|
||||
end
|
||||
@@active_connections.clear
|
||||
def clear_cache!(cache, &block)
|
||||
cache.each(&block) if block_given?
|
||||
cache.clear
|
||||
end
|
||||
end
|
||||
|
||||
@@ -208,9 +97,7 @@ module ActiveRecord
|
||||
raise AdapterNotSpecified unless defined? RAILS_ENV
|
||||
establish_connection(RAILS_ENV)
|
||||
when ConnectionSpecification
|
||||
clear_active_connection_name
|
||||
@active_connection_name = name
|
||||
@@defined_connections[name] = spec
|
||||
@@defined_connections[name] = ConnectionAdapters::ConnectionPool.new(spec)
|
||||
when Symbol, String
|
||||
if configuration = configurations[spec.to_s]
|
||||
establish_connection(configuration)
|
||||
@@ -248,26 +135,20 @@ module ActiveRecord
|
||||
# opened and set as the active connection for the class it was defined
|
||||
# for (not necessarily the current class).
|
||||
def self.retrieve_connection #:nodoc:
|
||||
# Name is nil if establish_connection hasn't been called for
|
||||
# some class along the inheritance chain up to AR::Base yet.
|
||||
if name = active_connection_name
|
||||
if conn = active_connections[name]
|
||||
# Verify the connection.
|
||||
conn.verify!(@@verification_timeout)
|
||||
elsif spec = @@defined_connections[name]
|
||||
# Activate this connection specification.
|
||||
klass = name.constantize
|
||||
klass.connection = spec
|
||||
conn = active_connections[name]
|
||||
end
|
||||
end
|
||||
pool = retrieve_connection_pool
|
||||
(pool && pool.connection) or raise ConnectionNotEstablished
|
||||
end
|
||||
|
||||
conn or raise ConnectionNotEstablished
|
||||
def self.retrieve_connection_pool
|
||||
pool = @@defined_connections[name]
|
||||
return pool if pool
|
||||
return nil if ActiveRecord::Base == self
|
||||
superclass.retrieve_connection_pool
|
||||
end
|
||||
|
||||
# Returns true if a connection that's accessible to this class has already been opened.
|
||||
def self.connected?
|
||||
active_connections[active_connection_name] ? true : false
|
||||
retrieve_connection_pool.connected?
|
||||
end
|
||||
|
||||
# Remove the connection for this class. This will close the active
|
||||
@@ -275,35 +156,10 @@ module ActiveRecord
|
||||
# can be used as an argument for establish_connection, for easily
|
||||
# re-establishing the connection.
|
||||
def self.remove_connection(klass=self)
|
||||
spec = @@defined_connections[klass.name]
|
||||
konn = active_connections[klass.name]
|
||||
@@defined_connections.delete_if { |key, value| value == spec }
|
||||
active_connections.delete_if { |key, value| value == konn }
|
||||
konn.disconnect! if konn
|
||||
spec.config if spec
|
||||
end
|
||||
|
||||
# Set the connection for the class.
|
||||
def self.connection=(spec) #:nodoc:
|
||||
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
|
||||
active_connections[name] = spec
|
||||
elsif spec.kind_of?(ConnectionSpecification)
|
||||
config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
|
||||
self.connection = self.send(spec.adapter_method, config)
|
||||
elsif spec.nil?
|
||||
raise ConnectionNotEstablished
|
||||
else
|
||||
establish_connection spec
|
||||
end
|
||||
end
|
||||
|
||||
# connection state logging
|
||||
def self.log_connections #:nodoc:
|
||||
if logger
|
||||
logger.info "Defined connections: #{@@defined_connections.inspect}"
|
||||
logger.info "Active connections: #{active_connections.inspect}"
|
||||
logger.info "Active connection name: #{@active_connection_name}"
|
||||
end
|
||||
pool = @@defined_connections[klass.name]
|
||||
@@defined_connections.delete_if { |key, value| value == pool }
|
||||
pool.disconnect! if pool
|
||||
pool.spec.config if pool
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
1
activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
Normal file → Executable file
1
activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
Normal file → Executable file
@@ -8,6 +8,7 @@ require 'active_record/connection_adapters/abstract/schema_statements'
|
||||
require 'active_record/connection_adapters/abstract/database_statements'
|
||||
require 'active_record/connection_adapters/abstract/quoting'
|
||||
require 'active_record/connection_adapters/abstract/connection_specification'
|
||||
require 'active_record/connection_adapters/abstract/connection_pool'
|
||||
require 'active_record/connection_adapters/abstract/query_cache'
|
||||
|
||||
module ActiveRecord
|
||||
|
||||
@@ -8,41 +8,32 @@ unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name
|
||||
|
||||
fixtures :topics
|
||||
|
||||
def setup
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
@connections = []
|
||||
@allow_concurrency = ActiveRecord::Base.allow_concurrency
|
||||
end
|
||||
def setup
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
@connections = []
|
||||
end
|
||||
|
||||
def teardown
|
||||
# clear the connection cache
|
||||
ActiveRecord::Base.send(:clear_all_cached_connections!)
|
||||
# set allow_concurrency to saved value
|
||||
ActiveRecord::Base.allow_concurrency = @allow_concurrency
|
||||
# reestablish old connection
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
end
|
||||
def teardown
|
||||
# clear the connection cache
|
||||
ActiveRecord::Base.clear_active_connections!
|
||||
# reestablish old connection
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
end
|
||||
|
||||
def gather_connections(use_threaded_connections)
|
||||
ActiveRecord::Base.allow_concurrency = use_threaded_connections
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
def gather_connections
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
|
||||
5.times do
|
||||
Thread.new do
|
||||
Topic.find :first
|
||||
@connections << ActiveRecord::Base.active_connections.values.first
|
||||
@connections << ActiveRecord::Base.active_connections.first
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_threaded_connections
|
||||
gather_connections(true)
|
||||
assert_equal @connections.uniq.length, 5
|
||||
end
|
||||
|
||||
def test_unthreaded_connections
|
||||
gather_connections(false)
|
||||
assert_equal @connections.uniq.length, 1
|
||||
gather_connections
|
||||
assert_equal @connections.length, 5
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user