aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <obscuren@users.noreply.github.com>2014-10-10 22:57:50 +0800
committerJeffrey Wilcke <obscuren@users.noreply.github.com>2014-10-10 22:57:50 +0800
commit6fec5bd32e64e15d16085591e732b26a243297fa (patch)
tree75269be1c098d1346fcb6460747969372b5f4cf3
parenta38dafcc57d296447db9748c8c85df6c58b243fb (diff)
parent44674cb96c64ddf9a8b3345f14329c030ecd4ed6 (diff)
downloaddexon-6fec5bd32e64e15d16085591e732b26a243297fa.tar.gz
dexon-6fec5bd32e64e15d16085591e732b26a243297fa.tar.zst
dexon-6fec5bd32e64e15d16085591e732b26a243297fa.zip
Merge pull request #56 from fjl/feature/raceless-eventer
Fix Eventer race
-rw-r--r--eventer/eventer.go12
-rw-r--r--eventer/eventer_test.go57
2 files changed, 60 insertions, 9 deletions
diff --git a/eventer/eventer.go b/eventer/eventer.go
index fb2f299a3..6e5ee2ec5 100644
--- a/eventer/eventer.go
+++ b/eventer/eventer.go
@@ -1,5 +1,7 @@
package eventer
+import "sync"
+
// Basic receiver interface.
type Receiver interface {
Send(Event)
@@ -27,17 +29,18 @@ type Event struct {
type Channels map[string][]Receiver
type EventMachine struct {
+ mu sync.RWMutex
channels Channels
}
func New() *EventMachine {
- return &EventMachine{
- channels: make(Channels),
- }
+ return &EventMachine{channels: make(Channels)}
}
func (self *EventMachine) add(typ string, r Receiver) {
+ self.mu.Lock()
self.channels[typ] = append(self.channels[typ], r)
+ self.mu.Unlock()
}
// Generalised methods for the known receiver types
@@ -64,11 +67,11 @@ func (self *EventMachine) RegisterFunc(typ string, f Function) {
func (self *EventMachine) Register(typ string) Channel {
c := make(Channel, 1)
self.add(typ, c)
-
return c
}
func (self *EventMachine) Post(typ string, data interface{}) {
+ self.mu.RLock()
if self.channels[typ] != nil {
ev := Event{typ, data}
for _, receiver := range self.channels[typ] {
@@ -76,4 +79,5 @@ func (self *EventMachine) Post(typ string, data interface{}) {
receiver.Send(ev)
}
}
+ self.mu.RUnlock()
}
diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go
index b35267af6..a5db6d901 100644
--- a/eventer/eventer_test.go
+++ b/eventer/eventer_test.go
@@ -1,9 +1,13 @@
package eventer
-import "testing"
+import (
+ "math/rand"
+ "testing"
+ "time"
+)
func TestChannel(t *testing.T) {
- eventer := New(nil)
+ eventer := New()
c := make(Channel, 1)
eventer.RegisterChannel("test", c)
@@ -17,7 +21,7 @@ func TestChannel(t *testing.T) {
}
func TestFunction(t *testing.T) {
- eventer := New(nil)
+ eventer := New()
var data string
eventer.RegisterFunc("test", func(ev Event) {
@@ -31,7 +35,7 @@ func TestFunction(t *testing.T) {
}
func TestRegister(t *testing.T) {
- eventer := New(nil)
+ eventer := New()
c := eventer.Register("test")
eventer.Post("test", "hello world")
@@ -44,7 +48,7 @@ func TestRegister(t *testing.T) {
}
func TestOn(t *testing.T) {
- eventer := New(nil)
+ eventer := New()
c := make(Channel, 1)
eventer.On("test", c)
@@ -64,3 +68,46 @@ func TestOn(t *testing.T) {
t.Error("Expected function event with data 'hello world'. Got", data)
}
}
+
+func TestConcurrentUsage(t *testing.T) {
+ rand.Seed(time.Now().Unix())
+ eventer := New()
+ stop := make(chan struct{})
+ recv := make(chan int)
+ poster := func() {
+ for {
+ select {
+ case <-stop:
+ return
+ default:
+ eventer.Post("test", "hi")
+ }
+ }
+ }
+ listener := func(i int) {
+ time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
+ c := eventer.Register("test")
+ // wait for the first event
+ <-c
+ recv <- i
+ // keep receiving to prevent deadlock
+ for {
+ select {
+ case <-stop:
+ return
+ case <-c:
+ }
+ }
+ }
+
+ nlisteners := 200
+ go poster()
+ for i := 0; i < nlisteners; i++ {
+ go listener(i)
+ }
+ // wait until everyone has been served
+ for i := 0; i < nlisteners; i++ {
+ <-recv
+ }
+ close(stop)
+}