aboutsummaryrefslogtreecommitdiffstats
path: root/event
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-12 20:04:38 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-10-12 21:22:03 +0800
commit402fd6e8c6a2e379351e0aae10a833fae6bcae6c (patch)
tree30ab93e93af7c70e5df213eb3665f51a293bc4a9 /event
parent315a422ba754eae10db21990a809f608f7af62d4 (diff)
downloadgo-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.gz
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.zst
go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.zip
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'event')
-rw-r--r--event/event.go37
-rw-r--r--event/event_test.go2
-rw-r--r--event/example_test.go2
3 files changed, 30 insertions, 11 deletions
diff --git a/event/event.go b/event/event.go
index ce74e5286..57dd52baa 100644
--- a/event/event.go
+++ b/event/event.go
@@ -22,14 +22,21 @@ import (
"fmt"
"reflect"
"sync"
+ "time"
)
+// Event is a time-tagged notification pushed to subscribers.
+type Event struct {
+ Time time.Time
+ Data interface{}
+}
+
// Subscription is implemented by event subscriptions.
type Subscription interface {
// Chan returns a channel that carries events.
// Implementations should return the same channel
// for any subsequent calls to Chan.
- Chan() <-chan interface{}
+ Chan() <-chan *Event
// Unsubscribe stops delivery of events to a subscription.
// The event channel is closed.
@@ -82,6 +89,10 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
func (mux *TypeMux) Post(ev interface{}) error {
+ event := &Event{
+ Time: time.Now(),
+ Data: ev,
+ }
rtyp := reflect.TypeOf(ev)
mux.mutex.RLock()
if mux.stopped {
@@ -91,7 +102,7 @@ func (mux *TypeMux) Post(ev interface{}) error {
subs := mux.subm[rtyp]
mux.mutex.RUnlock()
for _, sub := range subs {
- sub.deliver(ev)
+ sub.deliver(event)
}
return nil
}
@@ -143,6 +154,7 @@ func posdelete(slice []*muxsub, pos int) []*muxsub {
type muxsub struct {
mux *TypeMux
+ created time.Time
closeMu sync.Mutex
closing chan struct{}
closed bool
@@ -151,21 +163,22 @@ type muxsub struct {
// postC can be set to nil without affecting the return value of
// Chan.
postMu sync.RWMutex
- readC <-chan interface{}
- postC chan<- interface{}
+ readC <-chan *Event
+ postC chan<- *Event
}
func newsub(mux *TypeMux) *muxsub {
- c := make(chan interface{})
+ c := make(chan *Event)
return &muxsub{
mux: mux,
+ created: time.Now(),
readC: c,
postC: c,
closing: make(chan struct{}),
}
}
-func (s *muxsub) Chan() <-chan interface{} {
+func (s *muxsub) Chan() <-chan *Event {
return s.readC
}
@@ -189,11 +202,17 @@ func (s *muxsub) closewait() {
s.postMu.Unlock()
}
-func (s *muxsub) deliver(ev interface{}) {
+func (s *muxsub) deliver(event *Event) {
+ // Short circuit delivery if stale event
+ if s.created.After(event.Time) {
+ return
+ }
+ // Otherwise deliver the event
s.postMu.RLock()
+ defer s.postMu.RUnlock()
+
select {
- case s.postC <- ev:
+ case s.postC <- event:
case <-s.closing:
}
- s.postMu.RUnlock()
}
diff --git a/event/event_test.go b/event/event_test.go
index 465af38cd..323cfea49 100644
--- a/event/event_test.go
+++ b/event/event_test.go
@@ -37,7 +37,7 @@ func TestSub(t *testing.T) {
}()
ev := <-sub.Chan()
- if ev.(testEvent) != testEvent(5) {
+ if ev.Data.(testEvent) != testEvent(5) {
t.Errorf("Got %v (%T), expected event %v (%T)",
ev, ev, testEvent(5), testEvent(5))
}
diff --git a/event/example_test.go b/event/example_test.go
index d4642ef2f..29938e853 100644
--- a/event/example_test.go
+++ b/event/example_test.go
@@ -30,7 +30,7 @@ func ExampleTypeMux() {
sub := mux.Subscribe(someEvent{}, otherEvent{})
go func() {
for event := range sub.Chan() {
- fmt.Printf("Received: %#v\n", event)
+ fmt.Printf("Received: %#v\n", event.Data)
}
fmt.Println("done")
close(done)