Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine.

Signed-off-by: Yehuda Katz <wycats@Yehuda-Katz.local>
This commit is contained in:
José Valim
2009-11-19 14:50:27 -02:00
committed by Yehuda Katz
parent 0f9029ec48
commit 8104f65c32
2 changed files with 74 additions and 35 deletions

View File

@@ -41,7 +41,7 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
mattr_accessor :queue
mattr_accessor :queue, :listener
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,8 +54,13 @@ module ActiveSupport
@publisher ||= Publisher.new(queue)
end
def subscribe(pattern=nil, &block)
Subscriber.new(queue).bind(pattern).subscribe(&block)
def subscriber
@subscriber ||= Subscriber.new(queue)
end
def subscribe(pattern=nil, options={}, &block)
with = options[:with] || listener
subscriber.bind(with, pattern).subscribe(&block)
end
end
@@ -104,13 +109,14 @@ module ActiveSupport
@queue = queue
end
def bind(pattern)
@pattern = pattern
def bind(listener, pattern)
@listener = listener
@pattern = pattern
self
end
def subscribe
@queue.subscribe(@pattern) do |*args|
@queue.subscribe(@listener, @pattern) do |*args|
yield(*args)
end
end
@@ -138,6 +144,48 @@ module ActiveSupport
end
end
class AsyncListener
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
@queue = Queue.new
Thread.new { consume }
end
def publish(name, *args)
if !@pattern || @pattern === name.to_s
@queue << args.unshift(name)
end
end
def consume
while args = @queue.shift
@subscriber.call(*args)
end
end
def drained?
@queue.size.zero?
end
end
class SyncListener
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
end
def publish(name, *args)
if !@pattern || @pattern === name.to_s
@subscriber.call(*args.unshift(name))
end
end
def drained?
true
end
end
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
@@ -150,40 +198,16 @@ module ActiveSupport
@listeners.each { |l| l.publish(*args) }
end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
def subscribe(listener, pattern=nil, &block)
@listeners << listener.new(pattern, &block)
end
def drained?
@listeners.all? &:drained?
end
class Listener
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
@queue = Queue.new
Thread.new { consume }
end
def publish(name, *args)
if !@pattern || @pattern === name.to_s
@queue << args.unshift(name)
end
end
def consume
while args = @queue.shift
@subscriber.call(*args)
end
end
def drained?
@queue.size.zero?
end
end
end
end
Notifications.queue = Notifications::LittleFanout.new
Notifications.queue = Notifications::LittleFanout.new
Notifications.listener = Notifications::AsyncListener
end

View File

@@ -176,6 +176,21 @@ class NotificationsMainTest < Test::Unit::TestCase
assert_equal 1, @another.first.result
end
def test_subscriber_allows_sync_listeners
@another = []
ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args|
@another << ActiveSupport::Notifications::Event.new(*args)
end
Thread.expects(:new).never
ActiveSupport::Notifications.instrument(:something){ 0 }
ActiveSupport::Notifications.instrument(:cache){ 1 }
assert_equal 1, @another.size
assert_equal :cache, @another.first.name
assert_equal 1, @another.first.result
end
def test_with_several_consumers_and_several_events
@another = []
ActiveSupport::Notifications.subscribe do |*args|
@@ -201,6 +216,6 @@ class NotificationsMainTest < Test::Unit::TestCase
private
def drain
sleep(0.1) until ActiveSupport::Notifications.queue.drained?
sleep(0.05) until ActiveSupport::Notifications.queue.drained?
end
end