diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-12 20:04:38 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-12 21:22:03 +0800 |
commit | 402fd6e8c6a2e379351e0aae10a833fae6bcae6c (patch) | |
tree | 30ab93e93af7c70e5df213eb3665f51a293bc4a9 /event | |
parent | 315a422ba754eae10db21990a809f608f7af62d4 (diff) | |
download | go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.gz go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.tar.zst go-tangerine-402fd6e8c6a2e379351e0aae10a833fae6bcae6c.zip |
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'event')
-rw-r--r-- | event/event.go | 37 | ||||
-rw-r--r-- | event/event_test.go | 2 | ||||
-rw-r--r-- | event/example_test.go | 2 |
3 files changed, 30 insertions, 11 deletions
diff --git a/event/event.go b/event/event.go index ce74e5286..57dd52baa 100644 --- a/event/event.go +++ b/event/event.go @@ -22,14 +22,21 @@ import ( "fmt" "reflect" "sync" + "time" ) +// Event is a time-tagged notification pushed to subscribers. +type Event struct { + Time time.Time + Data interface{} +} + // Subscription is implemented by event subscriptions. type Subscription interface { // Chan returns a channel that carries events. // Implementations should return the same channel // for any subsequent calls to Chan. - Chan() <-chan interface{} + Chan() <-chan *Event // Unsubscribe stops delivery of events to a subscription. // The event channel is closed. @@ -82,6 +89,10 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { // Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. func (mux *TypeMux) Post(ev interface{}) error { + event := &Event{ + Time: time.Now(), + Data: ev, + } rtyp := reflect.TypeOf(ev) mux.mutex.RLock() if mux.stopped { @@ -91,7 +102,7 @@ func (mux *TypeMux) Post(ev interface{}) error { subs := mux.subm[rtyp] mux.mutex.RUnlock() for _, sub := range subs { - sub.deliver(ev) + sub.deliver(event) } return nil } @@ -143,6 +154,7 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux + created time.Time closeMu sync.Mutex closing chan struct{} closed bool @@ -151,21 +163,22 @@ type muxsub struct { // postC can be set to nil without affecting the return value of // Chan. postMu sync.RWMutex - readC <-chan interface{} - postC chan<- interface{} + readC <-chan *Event + postC chan<- *Event } func newsub(mux *TypeMux) *muxsub { - c := make(chan interface{}) + c := make(chan *Event) return &muxsub{ mux: mux, + created: time.Now(), readC: c, postC: c, closing: make(chan struct{}), } } -func (s *muxsub) Chan() <-chan interface{} { +func (s *muxsub) Chan() <-chan *Event { return s.readC } @@ -189,11 +202,17 @@ func (s *muxsub) closewait() { s.postMu.Unlock() } -func (s *muxsub) deliver(ev interface{}) { +func (s *muxsub) deliver(event *Event) { + // Short circuit delivery if stale event + if s.created.After(event.Time) { + return + } + // Otherwise deliver the event s.postMu.RLock() + defer s.postMu.RUnlock() + select { - case s.postC <- ev: + case s.postC <- event: case <-s.closing: } - s.postMu.RUnlock() } diff --git a/event/event_test.go b/event/event_test.go index 465af38cd..323cfea49 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -37,7 +37,7 @@ func TestSub(t *testing.T) { }() ev := <-sub.Chan() - if ev.(testEvent) != testEvent(5) { + if ev.Data.(testEvent) != testEvent(5) { t.Errorf("Got %v (%T), expected event %v (%T)", ev, ev, testEvent(5), testEvent(5)) } diff --git a/event/example_test.go b/event/example_test.go index d4642ef2f..29938e853 100644 --- a/event/example_test.go +++ b/event/example_test.go @@ -30,7 +30,7 @@ func ExampleTypeMux() { sub := mux.Subscribe(someEvent{}, otherEvent{}) go func() { for event := range sub.Chan() { - fmt.Printf("Received: %#v\n", event) + fmt.Printf("Received: %#v\n", event.Data) } fmt.Println("done") close(done) |