Move Shared Packages Into Async/ (#9620)

* async packages

* change pkg

* build

* working

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Raul Jordan
2021-09-18 12:26:11 -05:00
committed by GitHub
parent 7dadc780b8
commit 11a1f681e0
107 changed files with 158 additions and 187 deletions

29
async/event/BUILD.bazel Normal file
View File

@@ -0,0 +1,29 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"feed.go",
"subscription.go",
],
importpath = "github.com/prysmaticlabs/prysm/async/event",
visibility = ["//visibility:public"],
deps = ["//time/mclock:go_default_library"],
)
go_test(
name = "go_default_test",
size = "small",
srcs = [
"example_feed_test.go",
"example_scope_test.go",
"example_subscription_test.go",
"feed_test.go",
"subscription_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
],
)

View File

@@ -0,0 +1,73 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event_test
import (
"fmt"
"github.com/prysmaticlabs/prysm/async/event"
)
func ExampleFeed_acknowledgedEvents() {
// This example shows how the return value of Send can be used for request/reply
// interaction between event consumers and producers.
var feed event.Feed
type ackedEvent struct {
i int
ack chan<- struct{}
}
// Consumers wait for events on the feed and acknowledge processing.
done := make(chan struct{})
defer close(done)
for i := 0; i < 3; i++ {
ch := make(chan ackedEvent, 100)
sub := feed.Subscribe(ch)
go func() {
defer sub.Unsubscribe()
for {
select {
case ev := <-ch:
fmt.Println(ev.i) // "process" the event
ev.ack <- struct{}{}
case <-done:
return
}
}
}()
}
// The producer sends values of type ackedEvent with increasing values of i.
// It waits for all consumers to acknowledge before sending the next event.
for i := 0; i < 3; i++ {
acksignal := make(chan struct{})
n := feed.Send(ackedEvent{i, acksignal})
for ack := 0; ack < n; ack++ {
<-acksignal
}
}
// Output:
// 0
// 0
// 0
// 1
// 1
// 1
// 2
// 2
// 2
}

View File

@@ -0,0 +1,128 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event_test
import (
"fmt"
"sync"
"github.com/prysmaticlabs/prysm/async/event"
)
// This example demonstrates how SubscriptionScope can be used to control the lifetime of
// subscriptions.
//
// Our example program consists of two servers, each of which performs a calculation when
// requested. The servers also allow subscribing to results of all computations.
type divServer struct{ results event.Feed }
type mulServer struct{ results event.Feed }
func (s *divServer) do(a, b int) int {
r := a / b
s.results.Send(r)
return r
}
func (s *mulServer) do(a, b int) int {
r := a * b
s.results.Send(r)
return r
}
// The servers are contained in an App. The app controls the servers and exposes them
// through its API.
type App struct {
divServer
mulServer
scope event.SubscriptionScope
}
func (s *App) Calc(op byte, a, b int) int {
switch op {
case '/':
return s.divServer.do(a, b)
case '*':
return s.mulServer.do(a, b)
default:
panic("invalid op")
}
}
// The app's SubscribeResults method starts sending calculation results to the given
// channel. Subscriptions created through this method are tied to the lifetime of the App
// because they are registered in the scope.
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
switch op {
case '/':
return s.scope.Track(s.divServer.results.Subscribe(ch))
case '*':
return s.scope.Track(s.mulServer.results.Subscribe(ch))
default:
panic("invalid op")
}
}
// Stop stops the App, closing all subscriptions created through SubscribeResults.
func (s *App) Stop() {
s.scope.Close()
}
func ExampleSubscriptionScope() {
// Create the app.
var (
app App
wg sync.WaitGroup
divs = make(chan int)
muls = make(chan int)
)
// Run a subscriber in the background.
divsub := app.SubscribeResults('/', divs)
mulsub := app.SubscribeResults('*', muls)
wg.Add(1)
go func() {
defer wg.Done()
defer fmt.Println("subscriber exited")
defer divsub.Unsubscribe()
defer mulsub.Unsubscribe()
for {
select {
case result := <-divs:
fmt.Println("division happened:", result)
case result := <-muls:
fmt.Println("multiplication happened:", result)
case <-divsub.Err():
return
case <-mulsub.Err():
return
}
}
}()
// Interact with the app.
app.Calc('/', 22, 11)
app.Calc('*', 3, 4)
// Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
app.Stop()
wg.Wait()
// Output:
// division happened: 2
// multiplication happened: 12
// subscriber exited
}

View File

@@ -0,0 +1,56 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event_test
import (
"fmt"
"github.com/prysmaticlabs/prysm/async/event"
)
func ExampleNewSubscription() {
// Create a subscription that sends 10 integers on ch.
ch := make(chan int)
sub := event.NewSubscription(func(quit <-chan struct{}) error {
for i := 0; i < 10; i++ {
select {
case ch <- i:
case <-quit:
fmt.Println("unsubscribed")
return nil
}
}
return nil
})
// This is the consumer. It reads 5 integers, then aborts the subscription.
// Note that Unsubscribe waits until the producer has shut down.
for i := range ch {
fmt.Println(i)
if i == 4 {
sub.Unsubscribe()
break
}
}
// Output:
// 0
// 1
// 2
// 3
// 4
// unsubscribed
}

255
async/event/feed.go Normal file
View File

@@ -0,0 +1,255 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package event contains an event feed implementation for process communication.
package event
import (
"errors"
"reflect"
"sync"
)
var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")
// Feed implements one-to-many subscriptions where the carrier of events is a channel.
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
//
// Feeds can only be used with a single type. The type is determined by the first Send or
// Subscribe operation. Subsequent calls to these methods panic if the type does not
// match.
//
// The zero value is ready to use.
type Feed struct {
once sync.Once // ensures that init only runs once
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send
// The inbox holds newly subscribed channels until they are added to sendCases.
mu sync.Mutex
inbox caseList
etype reflect.Type
}
// This is the index of the first actual subscription channel in sendCases.
// sendCases[0] is a SelectRecv case for the removeSub channel.
const firstSubSendCase = 1
type feedTypeError struct {
got, want reflect.Type
op string
}
func (e feedTypeError) Error() string {
return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
}
func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock <- struct{}{}
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
// until the subscription is canceled. All channels added must have the same element type.
//
// The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
// note: callers must hold f.mu
func (f *Feed) typecheck(typ reflect.Type) bool {
if f.etype == nil {
f.etype = typ
return true
}
// In the event the feed's type is an actual interface, we
// perform an interface conformance check here.
if f.etype.Kind() == reflect.Interface && typ.Implements(f.etype) {
return true
}
return f.etype == typ
}
func (f *Feed) remove(sub *feedSub) {
// Delete from inbox first, which covers channels
// that have not been added to f.sendCases yet.
ch := sub.channel.Interface()
f.mu.Lock()
index := f.inbox.find(ch)
if index != -1 {
f.inbox = f.inbox.delete(index)
f.mu.Unlock()
return
}
f.mu.Unlock()
select {
case f.removeSub <- ch:
// Send will remove the channel from f.sendCases.
case <-f.sendLock:
// No Send is in progress, delete the channel now that we have the send lock.
f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
f.sendLock <- struct{}{}
}
}
// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
<-f.sendLock
// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
// Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)
if chosen == 0 /* <-f.removeSub */ {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent
}
type feedSub struct {
feed *Feed
channel reflect.Value
errOnce sync.Once
err chan error
}
// Unsubscribe remove feed subscription.
func (sub *feedSub) Unsubscribe() {
sub.errOnce.Do(func() {
sub.feed.remove(sub)
close(sub.err)
})
}
// Err returns error channel.
func (sub *feedSub) Err() <-chan error {
return sub.err
}
type caseList []reflect.SelectCase
// find returns the index of a case containing the given channel.
func (cs caseList) find(channel interface{}) int {
for i, cas := range cs {
if cas.Chan.Interface() == channel {
return i
}
}
return -1
}
// delete removes the given case from cs.
func (cs caseList) delete(index int) caseList {
return append(cs[:index], cs[index+1:]...)
}
// deactivate moves the case at index into the non-accessible portion of the cs slice.
func (cs caseList) deactivate(index int) caseList {
last := len(cs) - 1
cs[index], cs[last] = cs[last], cs[index]
return cs[:last]
}
// func (cs caseList) String() string {
// s := "["
// for i, cas := range cs {
// if i != 0 {
// s += ", "
// }
// switch cas.Dir {
// case reflect.SelectSend:
// s += fmt.Sprintf("%v<-", cas.Chan.Interface())
// case reflect.SelectRecv:
// s += fmt.Sprintf("<-%v", cas.Chan.Interface())
// }
// }
// return s + "]"
// }

507
async/event/feed_test.go Normal file
View File

@@ -0,0 +1,507 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"fmt"
"reflect"
"sync"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestFeedPanics(t *testing.T) {
{
var f Feed
f.Send(2)
want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)}
assert.NoError(t, checkPanic(want, func() { f.Send(uint64(2)) }))
}
{
var f Feed
ch := make(chan int)
f.Subscribe(ch)
want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(0)}
assert.NoError(t, checkPanic(want, func() { f.Send(uint64(2)) }))
}
{
var f Feed
f.Send(2)
want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
assert.NoError(t, checkPanic(want, func() { f.Subscribe(make(chan uint64)) }))
}
{
var f Feed
assert.NoError(t, checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }))
}
{
var f Feed
assert.NoError(t, checkPanic(errBadChannel, func() { f.Subscribe(0) }))
}
}
func checkPanic(want error, fn func()) (err error) {
defer func() {
panicResult := recover()
if panicResult == nil {
err = fmt.Errorf("didn't panic")
} else if !reflect.DeepEqual(panicResult, want) {
err = fmt.Errorf("panicked with wrong error: got %q, want %q", panicResult, want)
}
}()
fn()
return nil
}
func TestFeed(t *testing.T) {
var feed Feed
var done, subscribed sync.WaitGroup
subscriber := func(i int) {
defer done.Done()
subchan := make(chan int)
sub := feed.Subscribe(subchan)
timeout := time.NewTimer(2 * time.Second)
subscribed.Done()
select {
case v := <-subchan:
if v != 1 {
t.Errorf("%d: received value %d, want 1", i, v)
}
case <-timeout.C:
t.Errorf("%d: receive timeout", i)
}
sub.Unsubscribe()
select {
case _, ok := <-sub.Err():
if ok {
t.Errorf("%d: error channel not closed after unsubscribe", i)
}
case <-timeout.C:
t.Errorf("%d: unsubscribe timeout", i)
}
}
const n = 1000
done.Add(n)
subscribed.Add(n)
for i := 0; i < n; i++ {
go subscriber(i)
}
subscribed.Wait()
if nsent := feed.Send(1); nsent != n {
t.Errorf("first send delivered %d times, want %d", nsent, n)
}
if nsent := feed.Send(2); nsent != 0 {
t.Errorf("second send delivered %d times, want 0", nsent)
}
done.Wait()
}
func TestFeedSubscribeSameChannel(t *testing.T) {
var (
feed Feed
done sync.WaitGroup
ch = make(chan int)
sub1 = feed.Subscribe(ch)
sub2 = feed.Subscribe(ch)
_ = feed.Subscribe(ch)
)
expectSends := func(value, n int) {
if nsent := feed.Send(value); nsent != n {
t.Errorf("send delivered %d times, want %d", nsent, n)
}
done.Done()
}
expectRecv := func(wantValue, n int) {
for i := 0; i < n; i++ {
if v := <-ch; v != wantValue {
t.Errorf("received %d, want %d", v, wantValue)
}
}
}
done.Add(1)
go expectSends(1, 3)
expectRecv(1, 3)
done.Wait()
sub1.Unsubscribe()
done.Add(1)
go expectSends(2, 2)
expectRecv(2, 2)
done.Wait()
sub2.Unsubscribe()
done.Add(1)
go expectSends(3, 1)
expectRecv(3, 1)
done.Wait()
}
func TestFeedSubscribeBlockedPost(_ *testing.T) {
var (
feed Feed
nsends = 2000
ch1 = make(chan int)
ch2 = make(chan int)
wg sync.WaitGroup
)
defer wg.Wait()
feed.Subscribe(ch1)
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
feed.Send(99)
wg.Done()
}()
}
sub2 := feed.Subscribe(ch2)
defer sub2.Unsubscribe()
// We're done when ch1 has received N times.
// The number of receives on ch2 depends on scheduling.
for i := 0; i < nsends; {
select {
case <-ch1:
i++
case <-ch2:
}
}
}
func TestFeedUnsubscribeBlockedPost(_ *testing.T) {
var (
feed Feed
nsends = 200
chans = make([]chan int, 2000)
subs = make([]Subscription, len(chans))
bchan = make(chan int)
bsub = feed.Subscribe(bchan)
wg sync.WaitGroup
)
for i := range chans {
chans[i] = make(chan int, nsends)
}
// Queue up some Sends. None of these can make progress while bchan isn't read.
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
feed.Send(99)
wg.Done()
}()
}
// Subscribe the other channels.
for i, ch := range chans {
subs[i] = feed.Subscribe(ch)
}
// Unsubscribe them again.
for _, sub := range subs {
sub.Unsubscribe()
}
// Unblock the Sends.
bsub.Unsubscribe()
wg.Wait()
}
// Checks that unsubscribing a channel during Send works even if that
// channel has already been sent on.
func TestFeedUnsubscribeSentChan(_ *testing.T) {
var (
feed Feed
ch1 = make(chan int)
ch2 = make(chan int)
sub1 = feed.Subscribe(ch1)
sub2 = feed.Subscribe(ch2)
wg sync.WaitGroup
)
defer sub2.Unsubscribe()
wg.Add(1)
go func() {
feed.Send(0)
wg.Done()
}()
// Wait for the value on ch1.
<-ch1
// Unsubscribe ch1, removing it from the send cases.
sub1.Unsubscribe()
// Receive ch2, finishing Send.
<-ch2
wg.Wait()
// Send again. This should send to ch2 only, so the wait group will unblock
// as soon as a value is received on ch2.
wg.Add(1)
go func() {
feed.Send(0)
wg.Done()
}()
<-ch2
wg.Wait()
}
func TestFeedUnsubscribeFromInbox(t *testing.T) {
var (
feed Feed
ch1 = make(chan int)
ch2 = make(chan int)
sub1 = feed.Subscribe(ch1)
sub2 = feed.Subscribe(ch1)
sub3 = feed.Subscribe(ch2)
)
assert.Equal(t, 3, len(feed.inbox))
assert.Equal(t, 1, len(feed.sendCases), "sendCases is non-empty after unsubscribe")
sub1.Unsubscribe()
sub2.Unsubscribe()
sub3.Unsubscribe()
assert.Equal(t, 0, len(feed.inbox), "Inbox is non-empty after unsubscribe")
assert.Equal(t, 1, len(feed.sendCases), "sendCases is non-empty after unsubscribe")
}
func BenchmarkFeedSend1000(b *testing.B) {
var (
done sync.WaitGroup
feed Feed
nsubs = 1000
)
subscriber := func(ch <-chan int) {
for i := 0; i < b.N; i++ {
<-ch
}
done.Done()
}
done.Add(nsubs)
for i := 0; i < nsubs; i++ {
ch := make(chan int, 200)
feed.Subscribe(ch)
go subscriber(ch)
}
// The actual benchmark.
b.ResetTimer()
for i := 0; i < b.N; i++ {
if feed.Send(i) != nsubs {
panic("wrong number of sends")
}
}
b.StopTimer()
done.Wait()
}
func TestFeed_Send(t *testing.T) {
tests := []struct {
name string
evFeed *Feed
testSetup func(fd *Feed, t *testing.T, o interface{})
obj interface{}
expectPanic bool
}{
{
name: "normal struct",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeedWithPointer, 1)
fd.Subscribe(testChan)
},
obj: testFeedWithPointer{
a: new(uint64),
b: new(string),
},
expectPanic: false,
},
{
name: "un-implemented interface",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeedIface, 1)
fd.Subscribe(testChan)
},
obj: testFeedWithPointer{
a: new(uint64),
b: new(string),
},
expectPanic: true,
},
{
name: "semi-implemented interface",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeedIface, 1)
fd.Subscribe(testChan)
},
obj: testFeed2{
a: 0,
b: "",
c: []byte{'A'},
},
expectPanic: true,
},
{
name: "fully-implemented interface",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeedIface)
// Make it unbuffered to allow message to
// pass through
go func() {
a := <-testChan
if !reflect.DeepEqual(a, o) {
t.Errorf("Got = %v, want = %v", a, o)
}
}()
fd.Subscribe(testChan)
},
obj: testFeed{
a: 0,
b: "",
},
expectPanic: false,
},
{
name: "fully-implemented interface with additional methods",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeedIface)
// Make it unbuffered to allow message to
// pass through
go func() {
a := <-testChan
if !reflect.DeepEqual(a, o) {
t.Errorf("Got = %v, want = %v", a, o)
}
}()
fd.Subscribe(testChan)
},
obj: testFeed3{
a: 0,
b: "",
c: []byte{'A'},
d: []byte{'B'},
},
expectPanic: false,
},
{
name: "concrete types implementing the same interface",
evFeed: new(Feed),
testSetup: func(fd *Feed, t *testing.T, o interface{}) {
testChan := make(chan testFeed, 1)
// Make it unbuffered to allow message to
// pass through
go func() {
a := <-testChan
if !reflect.DeepEqual(a, o) {
t.Errorf("Got = %v, want = %v", a, o)
}
}()
fd.Subscribe(testChan)
},
obj: testFeed3{
a: 0,
b: "",
c: []byte{'A'},
d: []byte{'B'},
},
expectPanic: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
if !tt.expectPanic {
t.Errorf("panic triggered when unexpected: %v", r)
}
} else {
if tt.expectPanic {
t.Error("panic not triggered when expected")
}
}
}()
tt.testSetup(tt.evFeed, t, tt.obj)
if gotNsent := tt.evFeed.Send(tt.obj); gotNsent != 1 {
t.Errorf("Send() = %v, want %v", gotNsent, 1)
}
})
}
}
// The following objects below are a collection of different
// struct types to test with.
type testFeed struct {
a uint64
b string
}
func (testFeed) method1() {
}
func (testFeed) method2() {
}
type testFeedWithPointer struct {
a *uint64
b *string
}
type testFeed2 struct {
a uint64
b string
c []byte
}
func (testFeed2) method1() {
}
type testFeed3 struct {
a uint64
b string
c, d []byte
}
func (testFeed3) method1() {
}
func (testFeed3) method2() {
}
func (testFeed3) method3() {
}
type testFeedIface interface {
method1()
method2()
}

285
async/event/subscription.go Normal file
View File

@@ -0,0 +1,285 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"context"
"sync"
"time"
"github.com/prysmaticlabs/prysm/time/mclock"
)
// waitQuotient is divided against the max backoff time, in order to have N requests based on the full
// request backoff time.
const waitQuotient = 10
// Subscription represents a stream of events. The carrier of the events is typically a
// channel, but isn't part of the interface.
//
// Subscriptions can fail while established. Failures are reported through an error
// channel. It receives a value if there is an issue with the subscription (e.g. the
// network connection delivering the events has been closed). Only one value will ever be
// sent.
//
// The error channel is closed when the subscription ends successfully (i.e. when the
// source of events is closed). It is also closed when Unsubscribe is called.
//
// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
// cases to ensure that resources related to the subscription are released. It can be
// called any number of times.
type Subscription interface {
Err() <-chan error // returns the error channel
Unsubscribe() // cancels sending of events, closing the error channel
}
// NewSubscription runs a producer function as a subscription in a new goroutine. The
// channel given to the producer is closed when Unsubscribe is called. If fn returns an
// error, it is sent on the subscription's error channel.
func NewSubscription(producer func(<-chan struct{}) error) Subscription {
s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
go func() {
defer close(s.err)
err := producer(s.unsub)
s.mu.Lock()
defer s.mu.Unlock()
if !s.unsubscribed {
if err != nil {
s.err <- err
}
s.unsubscribed = true
}
}()
return s
}
type funcSub struct {
unsub chan struct{}
err chan error
mu sync.Mutex
unsubscribed bool
}
// Unsubscribe unsubscribes from subscription.
func (s *funcSub) Unsubscribe() {
s.mu.Lock()
if s.unsubscribed {
s.mu.Unlock()
return
}
s.unsubscribed = true
close(s.unsub)
s.mu.Unlock()
// Wait for producer shutdown.
<-s.err
}
// Err exposes error channel.
func (s *funcSub) Err() <-chan error {
return s.err
}
// Resubscribe calls fn repeatedly to keep a subscription established. When the
// subscription is established, Resubscribe waits for it to fail and calls fn again. This
// process repeats until Unsubscribe is called or the active subscription ends
// successfully.
//
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
// based on the error rate, but will never exceed backoffMax.
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
s := &resubscribeSub{
waitTime: backoffMax / waitQuotient,
backoffMax: backoffMax,
fn: fn,
err: make(chan error),
unsub: make(chan struct{}),
}
go s.loop()
return s
}
// A ResubscribeFunc attempts to establish a subscription.
type ResubscribeFunc func(context.Context) (Subscription, error)
type resubscribeSub struct {
fn ResubscribeFunc
err chan error
unsub chan struct{}
unsubOnce sync.Once
lastTry mclock.AbsTime
waitTime, backoffMax time.Duration
}
// Unsubscribe unsubscribes from subscription.
func (s *resubscribeSub) Unsubscribe() {
s.unsubOnce.Do(func() {
s.unsub <- struct{}{}
<-s.err
})
}
// Err exposes error channel.
func (s *resubscribeSub) Err() <-chan error {
return s.err
}
func (s *resubscribeSub) loop() {
defer close(s.err)
var done bool
for !done {
sub := s.subscribe()
if sub == nil {
break
}
done = s.waitForError(sub)
sub.Unsubscribe()
}
}
func (s *resubscribeSub) subscribe() Subscription {
subscribed := make(chan error, 1)
var sub Subscription
retry:
for {
s.lastTry = mclock.Now()
ctx, cancel := context.WithCancel(context.TODO())
go func() {
rsub, err := s.fn(ctx)
sub = rsub
subscribed <- err
}()
select {
case err := <-subscribed:
cancel()
if err != nil {
// Subscribing failed, wait before launching the next try.
if s.backoffWait() {
return nil
}
continue retry
}
if sub == nil {
panic("event: ResubscribeFunc returned nil subscription and no error")
}
return sub
case <-s.unsub:
cancel()
return nil
}
}
}
func (s *resubscribeSub) waitForError(sub Subscription) bool {
defer sub.Unsubscribe()
select {
case err := <-sub.Err():
return err == nil
case <-s.unsub:
return true
}
}
func (s *resubscribeSub) backoffWait() bool {
if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
s.waitTime = s.backoffMax / waitQuotient
} else {
s.waitTime *= 2
if s.waitTime > s.backoffMax {
s.waitTime = s.backoffMax
}
}
t := time.NewTimer(s.waitTime)
defer t.Stop()
select {
case <-t.C:
return false
case <-s.unsub:
return true
}
}
// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
//
// For code that handle more than one subscription, a scope can be used to conveniently
// unsubscribe all of them with a single call. The example demonstrates a typical use in a
// larger program.
//
// The zero value is ready to use.
type SubscriptionScope struct {
mu sync.Mutex
subs map[*scopeSub]struct{}
closed bool
}
type scopeSub struct {
sc *SubscriptionScope
s Subscription
}
// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
// scope.
func (sc *SubscriptionScope) Track(s Subscription) Subscription {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.closed {
return nil
}
if sc.subs == nil {
sc.subs = make(map[*scopeSub]struct{})
}
ss := &scopeSub{sc, s}
sc.subs[ss] = struct{}{}
return ss
}
// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
// the tracked set. Calls to Track after Close return nil.
func (sc *SubscriptionScope) Close() {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.closed {
return
}
sc.closed = true
for s := range sc.subs {
s.s.Unsubscribe()
}
sc.subs = nil
}
// Count returns the number of tracked subscriptions.
// It is meant to be used for debugging.
func (sc *SubscriptionScope) Count() int {
sc.mu.Lock()
defer sc.mu.Unlock()
return len(sc.subs)
}
// Unsubscribe unsubscribes from subscription.
func (s *scopeSub) Unsubscribe() {
s.s.Unsubscribe()
s.sc.mu.Lock()
defer s.sc.mu.Unlock()
delete(s.sc.subs, s)
}
// Err exposes error channel.
func (s *scopeSub) Err() <-chan error {
return s.s.Err()
}

View File

@@ -0,0 +1,126 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"context"
"errors"
"runtime"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
var errInts = errors.New("error in subscribeInts")
func subscribeInts(max, fail int, c chan<- int) Subscription {
return NewSubscription(func(quit <-chan struct{}) error {
for i := 0; i < max; i++ {
if i >= fail {
return errInts
}
select {
case c <- i:
case <-quit:
return nil
}
}
return nil
})
}
func TestNewSubscriptionError(t *testing.T) {
t.Parallel()
channel := make(chan int)
sub := subscribeInts(10, 2, channel)
loop:
for want := 0; want < 10; want++ {
select {
case got := <-channel:
require.Equal(t, want, got)
case err := <-sub.Err():
require.Equal(t, errInts, err)
require.Equal(t, 2, want)
break loop
}
}
sub.Unsubscribe()
err, ok := <-sub.Err()
require.NoError(t, err)
if ok {
t.Fatal("channel still open after Unsubscribe")
}
}
func TestResubscribe(t *testing.T) {
t.Parallel()
var i int
nfails := 6
sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
// fmt.Printf("call #%d @ %v\n", i, time.Now())
i++
if i == 2 {
// Delay the second failure a bit to reset the resubscribe interval.
time.Sleep(200 * time.Millisecond)
}
if i < nfails {
return nil, errors.New("oops")
}
sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
return sub, nil
})
<-sub.Err()
require.Equal(t, nfails, i)
}
func TestResubscribeAbort(t *testing.T) {
t.Parallel()
done := make(chan error)
sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
select {
case <-ctx.Done():
done <- nil
case <-time.After(2 * time.Second):
done <- errors.New("context given to resubscribe function not canceled within 2s")
}
return nil, nil
})
sub.Unsubscribe()
require.NoError(t, <-done)
}
func TestResubscribeNonBlocking(t *testing.T) {
done := make(chan struct{})
sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
<-done
return nil, nil
})
resub, ok := sub.(*resubscribeSub)
require.Equal(t, true, ok)
currNum := runtime.NumGoroutine()
resub.unsub <- struct{}{}
done <- struct{}{}
require.Equal(t, currNum-1, runtime.NumGoroutine())
}