mirror of
https://github.com/github/rails.git
synced 2026-04-26 03:00:59 -04:00
Abstract publishing, subscribing and instrumenting in Orchestra.
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
require 'thread'
|
||||
require 'active_support/core_ext/module/delegation'
|
||||
|
||||
module ActiveSupport
|
||||
# Orchestra provides an instrumentation API for Ruby. To instrument an action
|
||||
@@ -31,22 +32,65 @@ module ActiveSupport
|
||||
module Orchestra
|
||||
mattr_accessor :queue
|
||||
|
||||
@stacked_events = Hash.new { |h,k| h[k] = [] }
|
||||
class << self
|
||||
delegate :instrument, :to => :instrumenter
|
||||
|
||||
def self.instrument(name, payload=nil)
|
||||
stack = @stacked_events[Thread.current.object_id]
|
||||
event = Event.new(name, stack.last, payload)
|
||||
stack << event
|
||||
event.result = yield
|
||||
event
|
||||
ensure
|
||||
event.finish!
|
||||
stack.delete(event)
|
||||
queue.push(event)
|
||||
def instrumenter
|
||||
Thread.current[:orchestra_instrumeter] ||= Instrumenter.new(publisher)
|
||||
end
|
||||
|
||||
def publisher
|
||||
@publisher ||= Publisher.new(queue)
|
||||
end
|
||||
|
||||
def subscribe(pattern=nil, &block)
|
||||
Subscriber.new(queue).bind(pattern).subscribe(&block)
|
||||
end
|
||||
end
|
||||
|
||||
def self.subscribe(pattern=nil, &block)
|
||||
queue.subscribe(pattern, &block)
|
||||
class Instrumenter
|
||||
def initialize(publisher)
|
||||
@publisher = publisher
|
||||
@stack = []
|
||||
end
|
||||
|
||||
def instrument(name, payload=nil)
|
||||
event = Event.new(name, @stack.last, payload)
|
||||
@stack << event
|
||||
event.result = yield
|
||||
event
|
||||
ensure
|
||||
event.finish!
|
||||
@stack.pop
|
||||
@publisher.publish(event)
|
||||
end
|
||||
end
|
||||
|
||||
class Publisher
|
||||
def initialize(queue)
|
||||
@queue = queue
|
||||
end
|
||||
|
||||
def publish(event)
|
||||
@queue.publish(event)
|
||||
end
|
||||
end
|
||||
|
||||
class Subscriber
|
||||
def initialize(queue)
|
||||
@queue = queue
|
||||
end
|
||||
|
||||
def bind(pattern)
|
||||
@pattern = pattern
|
||||
self
|
||||
end
|
||||
|
||||
def subscribe
|
||||
@queue.subscribe(@pattern) do |event|
|
||||
yield event
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Event
|
||||
@@ -80,7 +124,7 @@ module ActiveSupport
|
||||
end
|
||||
end
|
||||
|
||||
def push(event)
|
||||
def publish(event)
|
||||
@stream.push(event)
|
||||
@thread.run
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user